summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/libs/index_build.js141
-rw-r--r--jstests/noPassthrough/resumable_index_build_initialized.js28
-rw-r--r--jstests/replsets/libs/rollback_resumable_index_build.js51
-rw-r--r--src/mongo/db/catalog/collection.h3
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp6
-rw-r--r--src/mongo/db/catalog/collection_impl.h3
-rw-r--r--src/mongo/db/catalog/collection_mock.h3
-rw-r--r--src/mongo/db/catalog/index_builds_manager.cpp5
-rw-r--r--src/mongo/db/catalog/index_builds_manager.h3
-rw-r--r--src/mongo/db/catalog/multi_index_block.cpp106
-rw-r--r--src/mongo/db/catalog/multi_index_block.h5
-rw-r--r--src/mongo/db/index/index_access_method.cpp14
-rw-r--r--src/mongo/db/index_builds_coordinator.cpp38
-rw-r--r--src/mongo/db/index_builds_coordinator.h4
-rw-r--r--src/mongo/db/query/internal_plans.cpp9
-rw-r--r--src/mongo/db/query/internal_plans.h6
-rw-r--r--src/mongo/db/resumable_index_builds.idl4
-rw-r--r--src/mongo/db/sorter/sorter.cpp73
-rw-r--r--src/mongo/db/sorter/sorter.h11
-rw-r--r--src/mongo/db/sorter/sorter_test.cpp1
20 files changed, 289 insertions, 225 deletions
diff --git a/jstests/noPassthrough/libs/index_build.js b/jstests/noPassthrough/libs/index_build.js
index 6f9e8cfd6a4..5f345fa3161 100644
--- a/jstests/noPassthrough/libs/index_build.js
+++ b/jstests/noPassthrough/libs/index_build.js
@@ -224,39 +224,36 @@ const ResumableIndexBuildTest = class {
}
/**
- * Creates the specified index in a parallel shell and expects it to fail with
- * InterruptedDueToReplStateChange. After calling this function and before waiting on the
- * returned parallel shell to complete, the test should cause the node to go through a replica
- * set state transition.
+ * Runs createIndexFn in a parellel shell to create an index, inserting the documents specified
+ * by sideWrites into the side writes table. createIndexFn should take three parameters:
+ * collection name, index specification, and index name. If {hangBeforeBuildingIndex: true},
+ * returns with the hangBeforeBuildingIndex failpoint enabled and the index build hanging at
+ * this point.
*/
- static createIndex(primary, collName, indexSpec, indexName) {
- return startParallelShell(
- funWithArgs(function(collName, indexSpec, indexName) {
- assert.commandFailedWithCode(
- db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
- ErrorCodes.InterruptedDueToReplStateChange);
- }, collName, indexSpec, indexName), primary.port);
- }
+ static createIndexWithSideWrites(test,
+ createIndexFn,
+ coll,
+ indexSpec,
+ indexName,
+ sideWrites,
+ {hangBeforeBuildingIndex} = {hangBeforeBuildingIndex: false}) {
+ const primary = test.getPrimary();
+ const fp = configureFailPoint(primary, "hangBeforeBuildingIndex");
- /**
- * Inserts the given documents once an index build reaches the end of the bulk load phase so
- * that the documents are inserted into the side writes table for that index build.
- */
- static insertIntoSideWritesTable(primary, collName, docs) {
- return startParallelShell(funWithArgs(function(collName, docs) {
- if (!docs)
- return;
+ const awaitCreateIndex = startParallelShell(
+ funWithArgs(createIndexFn, coll.getName(), indexSpec, indexName), primary.port);
- load("jstests/libs/fail_point_util.js");
+ fp.wait();
+ assert.commandWorked(coll.insert(sideWrites));
- const sideWritesFp = configureFailPoint(
- db.getMongo(), "hangAfterSettingUpIndexBuild");
- sideWritesFp.wait();
+ // Before building the index, wait for the the last op to be committed so that establishing
+ // the majority read cursor does not race with step down.
+ test.awaitLastOpCommitted();
- assert.commandWorked(db.getCollection(collName).insert(docs));
+ if (!hangBeforeBuildingIndex)
+ fp.off();
- sideWritesFp.off();
- }, collName, docs), primary.port);
+ return awaitCreateIndex;
}
/**
@@ -286,11 +283,7 @@ const ResumableIndexBuildTest = class {
.buildUUID);
// Don't interrupt the index build for shutdown until it is at the desired point.
- const shutdownFpTimesEntered =
- assert
- .commandWorked(
- conn.adminCommand({configureFailPoint: "hangBeforeShutdown", mode: "alwaysOn"}))
- .count;
+ const shutdownFpTimesEntered = configureFailPoint(conn, "hangBeforeShutdown").timesEntered;
const awaitContinueShutdown = startParallelShell(
funWithArgs(function(failPointName, failPointData, shutdownFpTimesEntered) {
@@ -307,8 +300,8 @@ const ResumableIndexBuildTest = class {
// Move the index build forward to the point that we want it to be interrupted for
// shutdown at.
const fp = configureFailPoint(db.getMongo(), failPointName, failPointData);
- assert.commandWorked(db.adminCommand(
- {configureFailPoint: "hangAfterSettingUpIndexBuildUnlocked", mode: "off"}));
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "hangBeforeBuildingIndex", mode: "off"}));
fp.wait();
// Disabling this failpoint will allow shutdown to continue and cause the operation
@@ -343,9 +336,7 @@ const ResumableIndexBuildTest = class {
rst.awaitReplication();
- if (postIndexBuildInserts) {
- assert.commandWorked(coll.insert(postIndexBuildInserts));
- }
+ assert.commandWorked(coll.insert(postIndexBuildInserts));
for (const node of rst.nodes) {
const collection = node.getDB(dbName).getCollection(collName);
@@ -357,9 +348,9 @@ const ResumableIndexBuildTest = class {
/**
* Runs the resumable index build test specified by the provided failpoint information and
- * index spec on the provided replica set and namespace. Document(s) specified by
- * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are
- * inserted into the side writes table and processed during the drain writes phase.
+ * index spec on the provided replica set and namespace. Documents specified by sideWrites will
+ * be inserted after the bulk load phase so that they are inserted into the side writes table
+ * and processed during the drain writes phase.
*/
static run(rst,
dbName,
@@ -367,8 +358,8 @@ const ResumableIndexBuildTest = class {
indexSpec,
failPointName,
failPointData,
- insertIntoSideWritesTable,
- postIndexBuildInserts = {}) {
+ sideWrites = [],
+ postIndexBuildInserts = []) {
const primary = rst.getPrimary();
if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(primary)) {
@@ -379,20 +370,16 @@ const ResumableIndexBuildTest = class {
const coll = primary.getDB(dbName).getCollection(collName);
const indexName = "resumable_index_build";
- const fp = configureFailPoint(primary, "hangAfterSettingUpIndexBuildUnlocked");
-
- const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable(
- primary, collName, insertIntoSideWritesTable);
-
- const awaitCreateIndex =
- ResumableIndexBuildTest.createIndex(primary, coll.getName(), indexSpec, indexName);
-
- fp.wait();
+ const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites(
+ rst, function(collName, indexSpec, indexName) {
+ assert.commandFailedWithCode(
+ db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
+ ErrorCodes.InterruptedDueToReplStateChange);
+ }, coll, indexSpec, indexName, sideWrites, {hangBeforeBuildingIndex: true});
ResumableIndexBuildTest.restart(
rst, primary, coll, indexName, failPointName, failPointData);
- awaitInsertIntoSideWritesTable();
awaitCreateIndex();
ResumableIndexBuildTest.checkIndexes(
@@ -402,9 +389,9 @@ const ResumableIndexBuildTest = class {
/**
* Runs the resumable index build test specified by the provided failpoint information and
* index spec on the provided replica set and namespace. This will specifically be used to test
- * that resuming an index build on the former primary works. Document(s) specified by
- * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are
- * inserted into the side writes table and processed during the drain writes phase.
+ * that resuming an index build on the former primary works. Documents specified by sideWrites
+ * will be inserted during the collection scan phase so that they are inserted into the side
+ * writes table and processed during the drain writes phase.
*/
static runOnPrimaryToTestCommitQuorum(rst,
dbName,
@@ -412,8 +399,8 @@ const ResumableIndexBuildTest = class {
indexSpec,
resumeNodeFailPointName,
otherNodeFailPointName,
- insertIntoSideWritesTable,
- postIndexBuildInserts) {
+ sideWrites = [],
+ postIndexBuildInserts = []) {
const resumeNode = rst.getPrimary();
const resumeDB = resumeNode.getDB(dbName);
@@ -423,17 +410,18 @@ const ResumableIndexBuildTest = class {
}
const secondary = rst.getSecondary();
- let coll = resumeDB.getCollection(collName);
+ const coll = resumeDB.getCollection(collName);
const indexName = "resumable_index_build";
const resumeNodeFp = configureFailPoint(resumeNode, resumeNodeFailPointName);
const otherNodeFp = configureFailPoint(secondary, otherNodeFailPointName);
- const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable(
- resumeNode, collName, insertIntoSideWritesTable);
-
- const awaitCreateIndex =
- ResumableIndexBuildTest.createIndex(resumeNode, coll.getName(), indexSpec, indexName);
+ const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites(
+ rst, function(collName, indexSpec, indexName) {
+ assert.commandFailedWithCode(
+ db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
+ ErrorCodes.InterruptedDueToReplStateChange);
+ }, coll, indexSpec, indexName, sideWrites);
otherNodeFp.wait();
resumeNodeFp.wait();
@@ -443,8 +431,6 @@ const ResumableIndexBuildTest = class {
.assertIndexes(coll, 2, ["_id_"], [indexName], {includeBuildUUIDs: true})[indexName]
.buildUUID);
- awaitInsertIntoSideWritesTable();
-
clearRawMongoProgramOutput();
rst.stop(resumeNode);
assert(RegExp("4841502.*" + buildUUID).test(rawMongoProgramOutput()));
@@ -465,9 +451,9 @@ const ResumableIndexBuildTest = class {
/**
* Runs the resumable index build test specified by the provided failpoint information and
* index spec on the provided replica set and namespace. This will specifically be used to test
- * that resuming an index build on a secondary works. Document(s) specified by
- * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are
- * inserted into the side writes table and processed during the drain writes phase.
+ * that resuming an index build on a secondary works. Documents specified by sideWrites will be
+ * inserted during the collection scan phase so that they are inserted into the side writes
+ * table and processed during the drain writes phase.
*/
static runOnSecondary(rst,
dbName,
@@ -476,37 +462,30 @@ const ResumableIndexBuildTest = class {
resumeNodeFailPointName,
resumeNodeFailPointData,
primaryFailPointName,
- insertIntoSideWritesTable,
- postIndexBuildInserts) {
+ sideWrites = [],
+ postIndexBuildInserts = []) {
const primary = rst.getPrimary();
const coll = primary.getDB(dbName).getCollection(collName);
const indexName = "resumable_index_build";
const resumeNode = rst.getSecondary();
const resumeNodeColl = resumeNode.getDB(dbName).getCollection(collName);
- const resumeNodeFp = configureFailPoint(resumeNode, "hangAfterSettingUpIndexBuildUnlocked");
- const sideWritesFp = configureFailPoint(resumeNode, "hangAfterSettingUpIndexBuild");
+ const resumeNodeFp = configureFailPoint(resumeNode, "hangBeforeBuildingIndex");
let primaryFp;
if (primaryFailPointName) {
primaryFp = configureFailPoint(primary, primaryFailPointName);
}
- const awaitCreateIndex = startParallelShell(
- funWithArgs(function(collName, indexSpec, indexName) {
+ const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites(
+ rst, function(collName, indexSpec, indexName) {
// If the secondary is shutdown for too long, the primary will step down until it
// can reach the secondary again. In this case, the index build will continue in the
// background.
assert.commandWorkedOrFailedWithCode(
db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
ErrorCodes.InterruptedDueToReplStateChange);
- }, collName, indexSpec, indexName), primary.port);
-
- // Make sure that the resumeNode has paused its index build before inserting the writes
- // intended for the side writes table.
- sideWritesFp.wait();
- assert.commandWorked(coll.insert(insertIntoSideWritesTable, {"writeConcern": {w: 2}}));
- sideWritesFp.off();
+ }, coll, indexSpec, indexName, sideWrites);
resumeNodeFp.wait();
diff --git a/jstests/noPassthrough/resumable_index_build_initialized.js b/jstests/noPassthrough/resumable_index_build_initialized.js
new file mode 100644
index 00000000000..354bb332452
--- /dev/null
+++ b/jstests/noPassthrough/resumable_index_build_initialized.js
@@ -0,0 +1,28 @@
+/**
+ * Tests that resumable index build state is written to disk upon clean shutdown when an index
+ * build has been initialized but has not yet begun the collection scan phase.
+ *
+ * @tags: [
+ * requires_persistence,
+ * requires_replication,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/noPassthrough/libs/index_build.js");
+
+const dbName = "test";
+const failPointName = "hangIndexBuildBeforeWaitingUntilMajorityOpTime";
+
+const rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+
+const coll = rst.getPrimary().getDB(dbName).getCollection(jsTestName());
+assert.commandWorked(coll.insert({a: 1}));
+
+ResumableIndexBuildTest.run(rst, dbName, coll.getName(), {a: 1}, failPointName, {});
+
+rst.stopSet();
+})(); \ No newline at end of file
diff --git a/jstests/replsets/libs/rollback_resumable_index_build.js b/jstests/replsets/libs/rollback_resumable_index_build.js
index a1ff7e4e65f..0ab2148e783 100644
--- a/jstests/replsets/libs/rollback_resumable_index_build.js
+++ b/jstests/replsets/libs/rollback_resumable_index_build.js
@@ -7,9 +7,8 @@ const RollbackResumableIndexBuildTest = class {
* rollback starts is specified by rollbackStartFailPointName. The phase that the index build
* will resume from after rollback completes is specified by rollbackEndFailPointName. If
* either of these points is in the drain writes phase, documents to insert into the side
- * writes table must be specified by insertIntoSideWritesTable. Documents specified by
- * insertsToBeRolledBack are inserted after transitioning to rollback operations and will be
- * rolled back.
+ * writes table must be specified by sideWrites. Documents specified by insertsToBeRolledBack
+ * are inserted after transitioning to rollback operations and will be rolled back.
*/
static run(rollbackTest,
dbName,
@@ -20,7 +19,7 @@ const RollbackResumableIndexBuildTest = class {
rollbackEndFailPointName,
rollbackEndFailPointData,
insertsToBeRolledBack,
- insertIntoSideWritesTable) {
+ sideWrites = []) {
const originalPrimary = rollbackTest.getPrimary();
if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(originalPrimary)) {
@@ -38,15 +37,15 @@ const RollbackResumableIndexBuildTest = class {
const rollbackStartFp = configureFailPoint(
originalPrimary, rollbackStartFailPointName, rollbackStartFailPointData);
- const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable(
- originalPrimary, collName, insertIntoSideWritesTable);
-
- const awaitCreateIndex =
- ResumableIndexBuildTest.createIndex(originalPrimary, collName, indexSpec, indexName);
+ const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites(
+ rollbackTest, function(collName, indexSpec, indexName) {
+ assert.commandFailedWithCode(
+ db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
+ ErrorCodes.InterruptedDueToReplStateChange);
+ }, coll, indexSpec, indexName, sideWrites);
// Wait until we've completed the last operation that won't be rolled back so that we can
// begin the operations that will be rolled back.
- awaitInsertIntoSideWritesTable();
rollbackEndFp.wait();
const buildUUID = extractUUIDFromObject(
@@ -120,9 +119,9 @@ const RollbackResumableIndexBuildTest = class {
* Runs the resumable index build rollback test in the case where rollback begins after the
* index build has already completed. The point during the index build to roll back to is
* specified by rollbackEndFailPointName. If this point is in the drain writes phase, documents
- * to insert into the side writes table must be specified by insertIntoSideWritesTable.
- * Documents specified by insertsToBeRolledBack are inserted after transitioning to rollback
- * operations and will be rolled back.
+ * to insert into the side writes table must be specified by sideWrites. Documents specified by
+ * insertsToBeRolledBack are inserted after transitioning to rollback operations and will be
+ * rolled back.
*/
static runIndexBuildComplete(rollbackTest,
dbName,
@@ -131,7 +130,7 @@ const RollbackResumableIndexBuildTest = class {
rollbackEndFailPointName,
rollbackEndFailPointData,
insertsToBeRolledBack,
- insertIntoSideWritesTable) {
+ sideWrites = []) {
const originalPrimary = rollbackTest.getPrimary();
if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(originalPrimary)) {
@@ -147,23 +146,19 @@ const RollbackResumableIndexBuildTest = class {
const rollbackEndFp =
configureFailPoint(originalPrimary, rollbackEndFailPointName, rollbackEndFailPointData);
- const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable(
- originalPrimary, collName, insertIntoSideWritesTable);
-
- const awaitCreateIndex =
- startParallelShell(funWithArgs(function(collName, indexSpec, indexName) {
- assert.commandWorked(db.runCommand({
- createIndexes: collName,
- indexes: [{key: indexSpec, name: indexName}],
- // Commit quorum is disabled so that the index build can
- // complete while the primary is isolated and will roll back.
- commitQuorum: 0
- }));
- }, collName, indexSpec, indexName), originalPrimary.port);
+ const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites(
+ rollbackTest, function(collName, indexSpec, indexName) {
+ assert.commandWorked(db.runCommand({
+ createIndexes: collName,
+ indexes: [{key: indexSpec, name: indexName}],
+ // Commit quorum is disabled so that the index build can
+ // complete while the primary is isolated and will roll back.
+ commitQuorum: 0
+ }));
+ }, coll, indexSpec, indexName, sideWrites);
// Wait until we reach the desired ending point so that we can begin the operations that
// will be rolled back.
- awaitInsertIntoSideWritesTable();
rollbackEndFp.wait();
const buildUUID = extractUUIDFromObject(
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 9b0d5f5b223..e73633c6d65 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -548,7 +548,8 @@ public:
virtual std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor(
OperationContext* opCtx,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- ScanDirection scanDirection) const = 0;
+ ScanDirection scanDirection,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none) const = 0;
virtual void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) = 0;
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 3aeebc627b5..5b8b8a0b1e7 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -1209,10 +1209,12 @@ StatusWith<std::vector<BSONObj>> CollectionImpl::addCollationDefaultsToIndexSpec
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> CollectionImpl::makePlanExecutor(
OperationContext* opCtx,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- ScanDirection scanDirection) const {
+ ScanDirection scanDirection,
+ boost::optional<RecordId> resumeAfterRecordId) const {
auto isForward = scanDirection == ScanDirection::kForward;
auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD;
- return InternalPlanner::collectionScan(opCtx, _ns.ns(), this, yieldPolicy, direction);
+ return InternalPlanner::collectionScan(
+ opCtx, _ns.ns(), this, yieldPolicy, direction, resumeAfterRecordId);
}
void CollectionImpl::setNs(NamespaceString nss) {
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 573cda25612..d3ab6380954 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -353,7 +353,8 @@ public:
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor(
OperationContext* opCtx,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- ScanDirection scanDirection) const final;
+ ScanDirection scanDirection,
+ boost::optional<RecordId> resumeAfterRecordId) const final;
void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) final;
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index e07a110f8ef..54e2ab7b76f 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -274,7 +274,8 @@ public:
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor(
OperationContext* opCtx,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- ScanDirection scanDirection) const {
+ ScanDirection scanDirection,
+ boost::optional<RecordId> resumeAfterRecordId) const {
std::abort();
}
diff --git a/src/mongo/db/catalog/index_builds_manager.cpp b/src/mongo/db/catalog/index_builds_manager.cpp
index b8298ca598a..bfa9dd3bcf2 100644
--- a/src/mongo/db/catalog/index_builds_manager.cpp
+++ b/src/mongo/db/catalog/index_builds_manager.cpp
@@ -122,10 +122,11 @@ Status IndexBuildsManager::setUpIndexBuild(OperationContext* opCtx,
Status IndexBuildsManager::startBuildingIndex(OperationContext* opCtx,
Collection* collection,
- const UUID& buildUUID) {
+ const UUID& buildUUID,
+ boost::optional<RecordId> resumeAfterRecordId) {
auto builder = invariant(_getBuilder(buildUUID));
- return builder->insertAllDocumentsInCollection(opCtx, collection);
+ return builder->insertAllDocumentsInCollection(opCtx, collection, resumeAfterRecordId);
}
Status IndexBuildsManager::resumeBuildingIndexFromBulkLoadPhase(OperationContext* opCtx,
diff --git a/src/mongo/db/catalog/index_builds_manager.h b/src/mongo/db/catalog/index_builds_manager.h
index 3980e390543..8c61dc72069 100644
--- a/src/mongo/db/catalog/index_builds_manager.h
+++ b/src/mongo/db/catalog/index_builds_manager.h
@@ -97,7 +97,8 @@ public:
*/
Status startBuildingIndex(OperationContext* opCtx,
Collection* collection,
- const UUID& buildUUID);
+ const UUID& buildUUID,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none);
Status resumeBuildingIndexFromBulkLoadPhase(OperationContext* opCtx, const UUID& buildUUID);
diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp
index 1b7f3f57ca4..8cd8b6d6222 100644
--- a/src/mongo/db/catalog/multi_index_block.cpp
+++ b/src/mongo/db/catalog/multi_index_block.cpp
@@ -74,32 +74,39 @@ MONGO_FAIL_POINT_DEFINE(leaveIndexBuildUnfinishedForShutdown);
namespace {
-void failPointHangDuringBuild(OperationContext* opCtx,
- FailPoint* fp,
- StringData where,
- const BSONObj& doc,
- unsigned long long iteration) {
- fp->executeIf(
- [=, &doc](const BSONObj& data) {
- LOGV2(20386,
- "Hanging index build during collection scan phase insertion",
- "where"_attr = where,
- "doc"_attr = doc);
-
- fp->pauseWhileSet(opCtx);
- },
- [&doc, iteration](const BSONObj& data) {
- if (data.hasField("iteration")) {
- return iteration == static_cast<unsigned long long>(data["iteration"].numberLong());
- }
+Status failPointHangDuringBuild(OperationContext* opCtx,
+ FailPoint* fp,
+ StringData where,
+ const BSONObj& doc,
+ unsigned long long iteration) {
+ try {
+ fp->executeIf(
+ [=, &doc](const BSONObj& data) {
+ LOGV2(20386,
+ "Hanging index build during collection scan phase insertion",
+ "where"_attr = where,
+ "doc"_attr = doc);
+
+ fp->pauseWhileSet(opCtx);
+ },
+ [&doc, iteration](const BSONObj& data) {
+ if (data.hasField("iteration")) {
+ return iteration ==
+ static_cast<unsigned long long>(data["iteration"].numberLong());
+ }
- auto fieldsToMatch = data.getObjectField("fieldsToMatch");
- return std::all_of(
- fieldsToMatch.begin(), fieldsToMatch.end(), [&doc](const auto& elem) {
- return SimpleBSONElementComparator::kInstance.evaluate(elem ==
- doc[elem.fieldName()]);
- });
- });
+ auto fieldsToMatch = data.getObjectField("fieldsToMatch");
+ return std::all_of(
+ fieldsToMatch.begin(), fieldsToMatch.end(), [&doc](const auto& elem) {
+ return SimpleBSONElementComparator::kInstance.evaluate(
+ elem == doc[elem.fieldName()]);
+ });
+ });
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
+ return exceptionToStatus();
+ }
+
+ return Status::OK();
}
} // namespace
@@ -204,10 +211,7 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init(
invariant(_indexes.empty());
- // TODO (SERVER-49409): Resume from the collection scan phase.
- if (resumeInfo &&
- (resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad ||
- resumeInfo->getPhase() == IndexBuildPhaseEnum::kDrainWrites)) {
+ if (resumeInfo) {
_phase = resumeInfo->getPhase();
}
@@ -311,8 +315,9 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init(
index.bulk = index.real->initiateBulk(
eachIndexBuildMaxMemoryUsageBytes,
- // TODO (SERVER-49409): Resume from the collection scan phase.
- resumeInfo && resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad
+ // When resuming from the drain writes phase, there is no Sorter state to
+ // reconstruct.
+ resumeInfo && resumeInfo->getPhase() != IndexBuildPhaseEnum::kDrainWrites
? sorterInfo
: boost::none);
@@ -387,8 +392,10 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init(
}
}
-Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx,
- Collection* collection) {
+Status MultiIndexBlock::insertAllDocumentsInCollection(
+ OperationContext* opCtx,
+ Collection* collection,
+ boost::optional<RecordId> resumeAfterRecordId) {
invariant(!_buildIsCleanedUp);
invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork());
@@ -447,15 +454,18 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx,
} else {
yieldPolicy = PlanYieldPolicy::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY;
}
- auto exec =
- collection->makePlanExecutor(opCtx, yieldPolicy, Collection::ScanDirection::kForward);
+ auto exec = collection->makePlanExecutor(
+ opCtx, yieldPolicy, Collection::ScanDirection::kForward, resumeAfterRecordId);
// Hint to the storage engine that this collection scan should not keep data in the cache.
bool readOnce = useReadOnceCursorsForIndexBuilds.load();
opCtx->recoveryUnit()->setReadOnce(readOnce);
try {
- invariant(_phase == IndexBuildPhaseEnum::kInitialized,
+ // The phase will be kCollectionScan when resuming an index build from the collection scan
+ // phase.
+ invariant(_phase == IndexBuildPhaseEnum::kInitialized ||
+ _phase == IndexBuildPhaseEnum::kCollectionScan,
IndexBuildPhase_serializer(_phase).toString());
_phase = IndexBuildPhaseEnum::kCollectionScan;
@@ -466,7 +476,7 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx,
MONGO_unlikely(hangAfterStartingIndexBuild.shouldFail())) {
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK())
- return opCtx->checkForInterruptNoAssert();
+ return interruptStatus;
if (PlanExecutor::ADVANCED != state) {
continue;
@@ -474,11 +484,14 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx,
progress->setTotalWhileRunning(collection->numRecords(opCtx));
- failPointHangDuringBuild(opCtx,
- &hangIndexBuildDuringCollectionScanPhaseBeforeInsertion,
- "before",
- objToIndex,
- n);
+ interruptStatus =
+ failPointHangDuringBuild(opCtx,
+ &hangIndexBuildDuringCollectionScanPhaseBeforeInsertion,
+ "before",
+ objToIndex,
+ n);
+ if (!interruptStatus.isOK())
+ return interruptStatus;
// The external sorter is not part of the storage engine and therefore does not need a
// WriteUnitOfWork to write keys.
@@ -491,7 +504,8 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx,
&hangIndexBuildDuringCollectionScanPhaseAfterInsertion,
"after",
objToIndex,
- n);
+ n)
+ .ignore();
// Go to the next document.
progress->hit();
@@ -891,10 +905,12 @@ BSONObj MultiIndexBlock::_constructStateObject() const {
BSONObjBuilder indexInfo(indexesArray.subobjStart());
- if (_phase == IndexBuildPhaseEnum::kCollectionScan ||
- _phase == IndexBuildPhaseEnum::kBulkLoad) {
+ if (_phase != IndexBuildPhaseEnum::kDrainWrites) {
auto state = index.bulk->getPersistedSorterState();
+ // TODO (SERVER-50289): Consider not including tempDir in the persisted resumable index
+ // build state.
+ indexInfo.append("tempDir", state.tempDir);
indexInfo.append("fileName", state.fileName);
indexInfo.append("numKeys", index.bulk->getKeysInserted());
diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h
index e375fffd094..5e52419dd1c 100644
--- a/src/mongo/db/catalog/multi_index_block.h
+++ b/src/mongo/db/catalog/multi_index_block.h
@@ -149,7 +149,10 @@ public:
*
* Should not be called inside of a WriteUnitOfWork.
*/
- Status insertAllDocumentsInCollection(OperationContext* opCtx, Collection* collection);
+ Status insertAllDocumentsInCollection(
+ OperationContext* opCtx,
+ Collection* collection,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none);
/**
* Call this after init() for each document in the collection.
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 2489279cb76..7cd46f8093b 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -506,12 +506,14 @@ AbstractIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(IndexCatalogEntry* i
size_t maxMemoryUsageBytes,
const IndexSorterInfo& sorterInfo)
: _indexCatalogEntry(index),
- _sorter(Sorter::makeFromExistingRanges(
- sorterInfo.getFileName()->toString(),
- *sorterInfo.getRanges(),
- SortOptions().ExtSortAllowed().MaxMemoryUsageBytes(maxMemoryUsageBytes),
- BtreeExternalSortComparison(),
- _makeSorterSettings())),
+ _sorter(Sorter::makeFromExistingRanges(sorterInfo.getFileName()->toString(),
+ *sorterInfo.getRanges(),
+ SortOptions()
+ .TempDir(sorterInfo.getTempDir()->toString())
+ .ExtSortAllowed()
+ .MaxMemoryUsageBytes(maxMemoryUsageBytes),
+ BtreeExternalSortComparison(),
+ _makeSorterSettings())),
_keysInserted(*sorterInfo.getNumKeys()) {}
Status AbstractIndexAccessMethod::BulkBuilderImpl::insert(OperationContext* opCtx,
diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp
index cd4b017a3f0..0b0bae768b3 100644
--- a/src/mongo/db/index_builds_coordinator.cpp
+++ b/src/mongo/db/index_builds_coordinator.cpp
@@ -78,6 +78,8 @@ MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeAbortCleanUp);
MONGO_FAIL_POINT_DEFINE(hangIndexBuildOnStepUp);
MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpResumableIndexBuild);
MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeCommit);
+MONGO_FAIL_POINT_DEFINE(hangBeforeBuildingIndex);
+MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeWaitingUntilMajorityOpTime);
namespace {
@@ -2262,10 +2264,7 @@ void IndexBuildsCoordinator::_runIndexBuildInner(
hangAfterInitializingIndexBuild.pauseWhileSet(opCtx);
}
- // TODO (SERVER-49409): Resume from the collection scan phase.
- if (resumeInfo &&
- (resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad ||
- resumeInfo->getPhase() == IndexBuildPhaseEnum::kDrainWrites)) {
+ if (resumeInfo) {
_resumeIndexBuildFromPhase(opCtx, replState, indexBuildOptions, resumeInfo.get());
} else {
_buildIndex(opCtx, replState, indexBuildOptions);
@@ -2339,8 +2338,17 @@ void IndexBuildsCoordinator::_resumeIndexBuildFromPhase(
hangAfterSettingUpResumableIndexBuild.pauseWhileSet();
}
- if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kBulkLoad)
+ if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kInitialized ||
+ resumeInfo.getPhase() == IndexBuildPhaseEnum::kCollectionScan) {
+ _scanCollectionAndInsertSortedKeysIntoIndex(
+ opCtx,
+ replState,
+ resumeInfo.getCollectionScanPosition()
+ ? boost::make_optional<RecordId>(RecordId(*resumeInfo.getCollectionScanPosition()))
+ : boost::none);
+ } else if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kBulkLoad) {
_insertSortedKeysIntoIndexForResume(opCtx, replState);
+ }
_insertKeysFromSideTablesWithoutBlockingWrites(opCtx, replState);
_signalPrimaryForCommitReadiness(opCtx, replState);
@@ -2364,6 +2372,13 @@ void IndexBuildsCoordinator::_awaitLastOpTimeBeforeInterceptorsMajorityCommitted
"buildUUID"_attr = replState->buildUUID,
"lastOpTime"_attr = replState->lastOpTimeBeforeInterceptors);
+ if (MONGO_unlikely(hangIndexBuildBeforeWaitingUntilMajorityOpTime.shouldFail())) {
+ LOGV2(4940901,
+ "Hanging index build before waiting for the last optime before interceptors to be "
+ "majority committed due to hangIndexBuildBeforeWaitingUntilMajorityOpTime failpoint");
+ hangIndexBuildBeforeWaitingUntilMajorityOpTime.pauseWhileSet(opCtx);
+ }
+
auto status =
replCoord->waitUntilMajorityOpTime(opCtx, replState->lastOpTimeBeforeInterceptors);
uassertStatusOK(status);
@@ -2377,6 +2392,11 @@ void IndexBuildsCoordinator::_awaitLastOpTimeBeforeInterceptorsMajorityCommitted
void IndexBuildsCoordinator::_buildIndex(OperationContext* opCtx,
std::shared_ptr<ReplIndexBuildState> replState,
const IndexBuildOptions& indexBuildOptions) {
+ if (MONGO_unlikely(hangBeforeBuildingIndex.shouldFail())) {
+ LOGV2(4940900, "Hanging before building index due to hangBeforeBuildingIndex failpoint");
+ hangBeforeBuildingIndex.pauseWhileSet();
+ }
+
// Read without a timestamp. When we commit, we block writes which guarantees all writes are
// visible.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
@@ -2394,7 +2414,9 @@ void IndexBuildsCoordinator::_buildIndex(OperationContext* opCtx,
* Second phase is extracting the sorted keys and writing them into the new index table.
*/
void IndexBuildsCoordinator::_scanCollectionAndInsertSortedKeysIntoIndex(
- OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) {
+ OperationContext* opCtx,
+ std::shared_ptr<ReplIndexBuildState> replState,
+ boost::optional<RecordId> resumeAfterRecordId) {
// Collection scan and insert into index.
{
@@ -2418,8 +2440,8 @@ void IndexBuildsCoordinator::_scanCollectionAndInsertSortedKeysIntoIndex(
CollectionCatalog::get(opCtx).lookupCollectionByUUID(opCtx, replState->collectionUUID);
invariant(collection);
- uassertStatusOK(
- _indexBuildsManager.startBuildingIndex(opCtx, collection, replState->buildUUID));
+ uassertStatusOK(_indexBuildsManager.startBuildingIndex(
+ opCtx, collection, replState->buildUUID, resumeAfterRecordId));
}
if (MONGO_unlikely(hangAfterIndexBuildDumpsInsertsFromBulk.shouldFail())) {
diff --git a/src/mongo/db/index_builds_coordinator.h b/src/mongo/db/index_builds_coordinator.h
index bbddc12f2dc..ead5e5e39b1 100644
--- a/src/mongo/db/index_builds_coordinator.h
+++ b/src/mongo/db/index_builds_coordinator.h
@@ -653,7 +653,9 @@ protected:
* Second phase is extracting the sorted keys and writing them into the new index table.
*/
void _scanCollectionAndInsertSortedKeysIntoIndex(
- OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState);
+ OperationContext* opCtx,
+ std::shared_ptr<ReplIndexBuildState> replState,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none);
/**
* Performs the second phase of the index build, for use when resuming from the second phase.
*/
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index 21a76b11120..264a467492f 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -53,7 +53,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
StringData ns,
const Collection* collection,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- const Direction direction) {
+ const Direction direction,
+ boost::optional<RecordId> resumeAfterRecordId) {
std::unique_ptr<WorkingSet> ws = std::make_unique<WorkingSet>();
auto expCtx = make_intrusive<ExpressionContext>(
@@ -70,7 +71,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
invariant(ns == collection->ns().ns());
- auto cs = _collectionScan(expCtx, ws.get(), collection, direction);
+ auto cs = _collectionScan(expCtx, ws.get(), collection, direction, resumeAfterRecordId);
// Takes ownership of 'ws' and 'cs'.
auto statusWithPlanExecutor =
@@ -201,12 +202,14 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WorkingSet* ws,
const Collection* collection,
- Direction direction) {
+ Direction direction,
+ boost::optional<RecordId> resumeAfterRecordId) {
invariant(collection);
CollectionScanParams params;
params.shouldWaitForOplogVisibility =
shouldWaitForOplogVisibility(expCtx->opCtx, collection, false);
+ params.resumeAfterRecordId = resumeAfterRecordId;
if (FORWARD == direction) {
params.direction = CollectionScanParams::FORWARD;
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index c6bfc9b5dc4..0dc87bc96cd 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -74,7 +74,8 @@ public:
StringData ns,
const Collection* collection,
PlanYieldPolicy::YieldPolicy yieldPolicy,
- const Direction direction = FORWARD);
+ const Direction direction = FORWARD,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none);
/**
* Returns a FETCH => DELETE plan.
@@ -135,7 +136,8 @@ private:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WorkingSet* ws,
const Collection* collection,
- Direction direction);
+ Direction direction,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none);
/**
* Returns a plan stage that is either an index scan or an index scan with a fetch stage.
diff --git a/src/mongo/db/resumable_index_builds.idl b/src/mongo/db/resumable_index_builds.idl
index bbe2f80dcd4..74a53553100 100644
--- a/src/mongo/db/resumable_index_builds.idl
+++ b/src/mongo/db/resumable_index_builds.idl
@@ -68,6 +68,10 @@ structs:
for this index build"
type: string
optional: true
+ tempDir:
+ description: "The directory into which we place a file when spilling data to disk"
+ type: string
+ optional: true
fileName:
description: "The name of the file that sorted data is written to"
type: string
diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp
index 099e8b25e5e..514578b974d 100644
--- a/src/mongo/db/sorter/sorter.cpp
+++ b/src/mongo/db/sorter/sorter.cpp
@@ -539,11 +539,8 @@ public:
NoLimitSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : _comp(comp), _settings(settings), _opts(opts) {
- verify(_opts.limit == 0);
- if (_opts.extSortAllowed) {
- this->_fileName = _opts.tempDir + "/" + nextFileName();
- }
+ : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) {
+ invariant(opts.limit == 0);
}
NoLimitSorter(const std::string& fileName,
@@ -551,14 +548,13 @@ public:
const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : _comp(comp),
+ : Sorter<Key, Value>(opts, fileName),
+ _comp(comp),
_settings(settings),
- _opts(opts),
- _nextSortedFileWriterOffset(ranges.back().getEndOffset()) {
- invariant(_opts.extSortAllowed);
+ _nextSortedFileWriterOffset(!ranges.empty() ? ranges.back().getEndOffset() : 0) {
+ invariant(opts.extSortAllowed);
this->_usedDisk = true;
- this->_fileName = fileName;
std::transform(ranges.begin(),
ranges.end(),
@@ -589,7 +585,7 @@ public:
_memUsed += key.memUsageForSorter();
_memUsed += val.memUsageForSorter();
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
}
@@ -601,7 +597,7 @@ public:
_data->emplace_back(std::move(key), std::move(val));
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
}
@@ -614,7 +610,7 @@ public:
}
spill();
- Iterator* mergeIt = Iterator::merge(this->_iters, this->_fileName, _opts, _comp);
+ Iterator* mergeIt = Iterator::merge(this->_iters, this->_fileName, this->_opts, _comp);
_done = true;
return mergeIt;
}
@@ -648,20 +644,21 @@ private:
if (_data->empty())
return;
- if (!_opts.extSortAllowed) {
+ if (!this->_opts.extSortAllowed) {
// This error message only applies to sorts from user queries made through the find or
// aggregation commands. Other clients, such as bulk index builds, should suppress this
// error, either by allowing external sorting or by catching and throwing a more
// appropriate error.
uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
- << " bytes, but did not opt in to external sorting.");
+ str::stream()
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
+ << " bytes, but did not opt in to external sorting.");
}
sort();
SortedFileWriter<Key, Value> writer(
- _opts, this->_fileName, _nextSortedFileWriterOffset, _settings);
+ this->_opts, this->_fileName, _nextSortedFileWriterOffset, _settings);
for (; !_data->empty(); _data->pop_front()) {
writer.addAlreadySorted(_data->front().first, _data->front().second);
}
@@ -675,7 +672,6 @@ private:
const Comparator _comp;
const Settings _settings;
- SortOptions _opts;
std::streampos _nextSortedFileWriterOffset = 0;
bool _done = false;
size_t _memUsed = 0;
@@ -741,19 +737,15 @@ public:
TopKSorter(const SortOptions& opts,
const Comparator& comp,
const Settings& settings = Settings())
- : _comp(comp),
+ : Sorter<Key, Value>(opts),
+ _comp(comp),
_settings(settings),
- _opts(opts),
_memUsed(0),
_haveCutoff(false),
_worstCount(0),
_medianCount(0) {
// This also *works* with limit==1 but LimitOneSorter should be used instead
- verify(_opts.limit > 1);
-
- if (_opts.extSortAllowed) {
- this->_fileName = _opts.tempDir + "/" + nextFileName();
- }
+ invariant(opts.limit > 1);
// Preallocate a fixed sized vector of the required size if we don't expect it to have a
// major impact on our memory budget. This is the common case with small limits.
@@ -778,7 +770,7 @@ public:
STLComparator less(_comp);
Data contender(key, val);
- if (_data.size() < _opts.limit) {
+ if (_data.size() < this->_opts.limit) {
if (_haveCutoff && !less(contender, _cutoff))
return;
@@ -787,16 +779,16 @@ public:
_memUsed += key.memUsageForSorter();
_memUsed += val.memUsageForSorter();
- if (_data.size() == _opts.limit)
+ if (_data.size() == this->_opts.limit)
std::make_heap(_data.begin(), _data.end(), less);
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
return;
}
- verify(_data.size() == _opts.limit);
+ invariant(_data.size() == this->_opts.limit);
if (!less(contender, _data.front()))
return; // not good enough
@@ -813,7 +805,7 @@ public:
_data.back() = {contender.first.getOwned(), contender.second.getOwned()};
std::push_heap(_data.begin(), _data.end(), less);
- if (_memUsed > _opts.maxMemoryUsageBytes)
+ if (_memUsed > this->_opts.maxMemoryUsageBytes)
spill();
}
@@ -824,7 +816,7 @@ public:
}
spill();
- Iterator* iterator = Iterator::merge(this->_iters, this->_fileName, _opts, _comp);
+ Iterator* iterator = Iterator::merge(this->_iters, this->_fileName, this->_opts, _comp);
_done = true;
return iterator;
}
@@ -845,7 +837,7 @@ private:
void sort() {
STLComparator less(_comp);
- if (_data.size() == _opts.limit) {
+ if (_data.size() == this->_opts.limit) {
std::sort_heap(_data.begin(), _data.end(), less);
} else {
std::stable_sort(_data.begin(), _data.end(), less);
@@ -914,14 +906,14 @@ private:
// Promote _worstSeen or _lastMedian to _cutoff and reset counters if should.
- if (_worstCount >= _opts.limit) {
+ if (_worstCount >= this->_opts.limit) {
if (!_haveCutoff || less(_worstSeen, _cutoff)) {
_cutoff = _worstSeen;
_haveCutoff = true;
}
_worstCount = 0;
}
- if (_medianCount >= _opts.limit) {
+ if (_medianCount >= this->_opts.limit) {
if (!_haveCutoff || less(_lastMedian, _cutoff)) {
_cutoff = _lastMedian;
_haveCutoff = true;
@@ -937,13 +929,13 @@ private:
if (_data.empty())
return;
- if (!_opts.extSortAllowed) {
+ if (!this->_opts.extSortAllowed) {
// This error message only applies to sorts from user queries made through the find or
// aggregation commands. Other clients should suppress this error, either by allowing
// external sorting or by catching and throwing a more appropriate error.
uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
str::stream()
- << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes
+ << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes
<< " bytes, but did not opt in to external sorting. Aborting operation."
<< " Pass allowDiskUse:true to opt in.");
}
@@ -955,7 +947,7 @@ private:
updateCutoff();
SortedFileWriter<Key, Value> writer(
- _opts, this->_fileName, _nextSortedFileWriterOffset, _settings);
+ this->_opts, this->_fileName, _nextSortedFileWriterOffset, _settings);
for (size_t i = 0; i < _data.size(); i++) {
writer.addAlreadySorted(_data[i].first, _data[i].second);
}
@@ -972,7 +964,6 @@ private:
const Comparator _comp;
const Settings _settings;
- SortOptions _opts;
std::streampos _nextSortedFileWriterOffset = 0;
bool _done = false;
size_t _memUsed;
@@ -992,6 +983,10 @@ private:
} // namespace sorter
template <typename Key, typename Value>
+Sorter<Key, Value>::Sorter(const SortOptions& opts)
+ : Sorter(opts, opts.extSortAllowed ? opts.tempDir + "/" + nextFileName() : "") {}
+
+template <typename Key, typename Value>
std::vector<SorterRange> Sorter<Key, Value>::_getRanges() const {
std::vector<SorterRange> ranges;
ranges.reserve(_iters.size());
@@ -1185,8 +1180,6 @@ Sorter<Key, Value>* Sorter<Key, Value>::makeFromExistingRanges(
"NoLimitSorter (limit 0), but got limit "
<< opts.limit);
- invariant(!ranges.empty());
-
return new sorter::NoLimitSorter<Key, Value, Comparator>(
fileName, ranges, opts, comp, settings);
}
diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h
index 1debcf5e318..d301c4c6205 100644
--- a/src/mongo/db/sorter/sorter.h
+++ b/src/mongo/db/sorter/sorter.h
@@ -105,7 +105,7 @@ struct SortOptions {
bool extSortAllowed;
// Directory into which we place a file when spilling to disk. Must be explicitly set if
- // extSortAllowed is true, unless constructing a Sorter from an existing file and ranges.
+ // extSortAllowed is true.
std::string tempDir;
SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {}
@@ -223,10 +223,16 @@ public:
Settings;
struct PersistedState {
+ std::string tempDir;
std::string fileName;
std::vector<SorterRange> ranges;
};
+ Sorter(const SortOptions& opts);
+
+ Sorter(const SortOptions& opts, const std::string& fileName)
+ : _opts(opts), _fileName(fileName) {}
+
template <typename Comparator>
static Sorter* make(const SortOptions& opts,
const Comparator& comp,
@@ -257,7 +263,7 @@ public:
}
PersistedState getPersistedState() const {
- return {_fileName, _getRanges()};
+ return {_opts.tempDir, _fileName, _getRanges()};
}
void persistDataForShutdown();
@@ -274,6 +280,7 @@ protected:
// Whether the files written by this Sorter should be kept on destruction.
bool _shouldKeepFilesOnDestruction = false;
+ SortOptions _opts;
std::string _fileName;
std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled.
diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp
index 2b2d2d77992..4de5eb031f8 100644
--- a/src/mongo/db/sorter/sorter_test.cpp
+++ b/src/mongo/db/sorter/sorter_test.cpp
@@ -482,6 +482,7 @@ private:
void assertRangeInfo(unowned_ptr<IWSorter> sorter, const SortOptions& opts) {
auto state = sorter->getPersistedState();
if (opts.extSortAllowed) {
+ ASSERT_EQ(state.tempDir, opts.tempDir);
ASSERT_STRING_CONTAINS(state.fileName, opts.tempDir);
}
if (auto numRanges = correctNumRanges()) {