diff options
20 files changed, 289 insertions, 225 deletions
diff --git a/jstests/noPassthrough/libs/index_build.js b/jstests/noPassthrough/libs/index_build.js index 6f9e8cfd6a4..5f345fa3161 100644 --- a/jstests/noPassthrough/libs/index_build.js +++ b/jstests/noPassthrough/libs/index_build.js @@ -224,39 +224,36 @@ const ResumableIndexBuildTest = class { } /** - * Creates the specified index in a parallel shell and expects it to fail with - * InterruptedDueToReplStateChange. After calling this function and before waiting on the - * returned parallel shell to complete, the test should cause the node to go through a replica - * set state transition. + * Runs createIndexFn in a parellel shell to create an index, inserting the documents specified + * by sideWrites into the side writes table. createIndexFn should take three parameters: + * collection name, index specification, and index name. If {hangBeforeBuildingIndex: true}, + * returns with the hangBeforeBuildingIndex failpoint enabled and the index build hanging at + * this point. */ - static createIndex(primary, collName, indexSpec, indexName) { - return startParallelShell( - funWithArgs(function(collName, indexSpec, indexName) { - assert.commandFailedWithCode( - db.getCollection(collName).createIndex(indexSpec, {name: indexName}), - ErrorCodes.InterruptedDueToReplStateChange); - }, collName, indexSpec, indexName), primary.port); - } + static createIndexWithSideWrites(test, + createIndexFn, + coll, + indexSpec, + indexName, + sideWrites, + {hangBeforeBuildingIndex} = {hangBeforeBuildingIndex: false}) { + const primary = test.getPrimary(); + const fp = configureFailPoint(primary, "hangBeforeBuildingIndex"); - /** - * Inserts the given documents once an index build reaches the end of the bulk load phase so - * that the documents are inserted into the side writes table for that index build. - */ - static insertIntoSideWritesTable(primary, collName, docs) { - return startParallelShell(funWithArgs(function(collName, docs) { - if (!docs) - return; + const awaitCreateIndex = startParallelShell( + funWithArgs(createIndexFn, coll.getName(), indexSpec, indexName), primary.port); - load("jstests/libs/fail_point_util.js"); + fp.wait(); + assert.commandWorked(coll.insert(sideWrites)); - const sideWritesFp = configureFailPoint( - db.getMongo(), "hangAfterSettingUpIndexBuild"); - sideWritesFp.wait(); + // Before building the index, wait for the the last op to be committed so that establishing + // the majority read cursor does not race with step down. + test.awaitLastOpCommitted(); - assert.commandWorked(db.getCollection(collName).insert(docs)); + if (!hangBeforeBuildingIndex) + fp.off(); - sideWritesFp.off(); - }, collName, docs), primary.port); + return awaitCreateIndex; } /** @@ -286,11 +283,7 @@ const ResumableIndexBuildTest = class { .buildUUID); // Don't interrupt the index build for shutdown until it is at the desired point. - const shutdownFpTimesEntered = - assert - .commandWorked( - conn.adminCommand({configureFailPoint: "hangBeforeShutdown", mode: "alwaysOn"})) - .count; + const shutdownFpTimesEntered = configureFailPoint(conn, "hangBeforeShutdown").timesEntered; const awaitContinueShutdown = startParallelShell( funWithArgs(function(failPointName, failPointData, shutdownFpTimesEntered) { @@ -307,8 +300,8 @@ const ResumableIndexBuildTest = class { // Move the index build forward to the point that we want it to be interrupted for // shutdown at. const fp = configureFailPoint(db.getMongo(), failPointName, failPointData); - assert.commandWorked(db.adminCommand( - {configureFailPoint: "hangAfterSettingUpIndexBuildUnlocked", mode: "off"})); + assert.commandWorked( + db.adminCommand({configureFailPoint: "hangBeforeBuildingIndex", mode: "off"})); fp.wait(); // Disabling this failpoint will allow shutdown to continue and cause the operation @@ -343,9 +336,7 @@ const ResumableIndexBuildTest = class { rst.awaitReplication(); - if (postIndexBuildInserts) { - assert.commandWorked(coll.insert(postIndexBuildInserts)); - } + assert.commandWorked(coll.insert(postIndexBuildInserts)); for (const node of rst.nodes) { const collection = node.getDB(dbName).getCollection(collName); @@ -357,9 +348,9 @@ const ResumableIndexBuildTest = class { /** * Runs the resumable index build test specified by the provided failpoint information and - * index spec on the provided replica set and namespace. Document(s) specified by - * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are - * inserted into the side writes table and processed during the drain writes phase. + * index spec on the provided replica set and namespace. Documents specified by sideWrites will + * be inserted after the bulk load phase so that they are inserted into the side writes table + * and processed during the drain writes phase. */ static run(rst, dbName, @@ -367,8 +358,8 @@ const ResumableIndexBuildTest = class { indexSpec, failPointName, failPointData, - insertIntoSideWritesTable, - postIndexBuildInserts = {}) { + sideWrites = [], + postIndexBuildInserts = []) { const primary = rst.getPrimary(); if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(primary)) { @@ -379,20 +370,16 @@ const ResumableIndexBuildTest = class { const coll = primary.getDB(dbName).getCollection(collName); const indexName = "resumable_index_build"; - const fp = configureFailPoint(primary, "hangAfterSettingUpIndexBuildUnlocked"); - - const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable( - primary, collName, insertIntoSideWritesTable); - - const awaitCreateIndex = - ResumableIndexBuildTest.createIndex(primary, coll.getName(), indexSpec, indexName); - - fp.wait(); + const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites( + rst, function(collName, indexSpec, indexName) { + assert.commandFailedWithCode( + db.getCollection(collName).createIndex(indexSpec, {name: indexName}), + ErrorCodes.InterruptedDueToReplStateChange); + }, coll, indexSpec, indexName, sideWrites, {hangBeforeBuildingIndex: true}); ResumableIndexBuildTest.restart( rst, primary, coll, indexName, failPointName, failPointData); - awaitInsertIntoSideWritesTable(); awaitCreateIndex(); ResumableIndexBuildTest.checkIndexes( @@ -402,9 +389,9 @@ const ResumableIndexBuildTest = class { /** * Runs the resumable index build test specified by the provided failpoint information and * index spec on the provided replica set and namespace. This will specifically be used to test - * that resuming an index build on the former primary works. Document(s) specified by - * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are - * inserted into the side writes table and processed during the drain writes phase. + * that resuming an index build on the former primary works. Documents specified by sideWrites + * will be inserted during the collection scan phase so that they are inserted into the side + * writes table and processed during the drain writes phase. */ static runOnPrimaryToTestCommitQuorum(rst, dbName, @@ -412,8 +399,8 @@ const ResumableIndexBuildTest = class { indexSpec, resumeNodeFailPointName, otherNodeFailPointName, - insertIntoSideWritesTable, - postIndexBuildInserts) { + sideWrites = [], + postIndexBuildInserts = []) { const resumeNode = rst.getPrimary(); const resumeDB = resumeNode.getDB(dbName); @@ -423,17 +410,18 @@ const ResumableIndexBuildTest = class { } const secondary = rst.getSecondary(); - let coll = resumeDB.getCollection(collName); + const coll = resumeDB.getCollection(collName); const indexName = "resumable_index_build"; const resumeNodeFp = configureFailPoint(resumeNode, resumeNodeFailPointName); const otherNodeFp = configureFailPoint(secondary, otherNodeFailPointName); - const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable( - resumeNode, collName, insertIntoSideWritesTable); - - const awaitCreateIndex = - ResumableIndexBuildTest.createIndex(resumeNode, coll.getName(), indexSpec, indexName); + const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites( + rst, function(collName, indexSpec, indexName) { + assert.commandFailedWithCode( + db.getCollection(collName).createIndex(indexSpec, {name: indexName}), + ErrorCodes.InterruptedDueToReplStateChange); + }, coll, indexSpec, indexName, sideWrites); otherNodeFp.wait(); resumeNodeFp.wait(); @@ -443,8 +431,6 @@ const ResumableIndexBuildTest = class { .assertIndexes(coll, 2, ["_id_"], [indexName], {includeBuildUUIDs: true})[indexName] .buildUUID); - awaitInsertIntoSideWritesTable(); - clearRawMongoProgramOutput(); rst.stop(resumeNode); assert(RegExp("4841502.*" + buildUUID).test(rawMongoProgramOutput())); @@ -465,9 +451,9 @@ const ResumableIndexBuildTest = class { /** * Runs the resumable index build test specified by the provided failpoint information and * index spec on the provided replica set and namespace. This will specifically be used to test - * that resuming an index build on a secondary works. Document(s) specified by - * insertIntoSideWritesTable will be inserted during the collection scan phase so that they are - * inserted into the side writes table and processed during the drain writes phase. + * that resuming an index build on a secondary works. Documents specified by sideWrites will be + * inserted during the collection scan phase so that they are inserted into the side writes + * table and processed during the drain writes phase. */ static runOnSecondary(rst, dbName, @@ -476,37 +462,30 @@ const ResumableIndexBuildTest = class { resumeNodeFailPointName, resumeNodeFailPointData, primaryFailPointName, - insertIntoSideWritesTable, - postIndexBuildInserts) { + sideWrites = [], + postIndexBuildInserts = []) { const primary = rst.getPrimary(); const coll = primary.getDB(dbName).getCollection(collName); const indexName = "resumable_index_build"; const resumeNode = rst.getSecondary(); const resumeNodeColl = resumeNode.getDB(dbName).getCollection(collName); - const resumeNodeFp = configureFailPoint(resumeNode, "hangAfterSettingUpIndexBuildUnlocked"); - const sideWritesFp = configureFailPoint(resumeNode, "hangAfterSettingUpIndexBuild"); + const resumeNodeFp = configureFailPoint(resumeNode, "hangBeforeBuildingIndex"); let primaryFp; if (primaryFailPointName) { primaryFp = configureFailPoint(primary, primaryFailPointName); } - const awaitCreateIndex = startParallelShell( - funWithArgs(function(collName, indexSpec, indexName) { + const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites( + rst, function(collName, indexSpec, indexName) { // If the secondary is shutdown for too long, the primary will step down until it // can reach the secondary again. In this case, the index build will continue in the // background. assert.commandWorkedOrFailedWithCode( db.getCollection(collName).createIndex(indexSpec, {name: indexName}), ErrorCodes.InterruptedDueToReplStateChange); - }, collName, indexSpec, indexName), primary.port); - - // Make sure that the resumeNode has paused its index build before inserting the writes - // intended for the side writes table. - sideWritesFp.wait(); - assert.commandWorked(coll.insert(insertIntoSideWritesTable, {"writeConcern": {w: 2}})); - sideWritesFp.off(); + }, coll, indexSpec, indexName, sideWrites); resumeNodeFp.wait(); diff --git a/jstests/noPassthrough/resumable_index_build_initialized.js b/jstests/noPassthrough/resumable_index_build_initialized.js new file mode 100644 index 00000000000..354bb332452 --- /dev/null +++ b/jstests/noPassthrough/resumable_index_build_initialized.js @@ -0,0 +1,28 @@ +/** + * Tests that resumable index build state is written to disk upon clean shutdown when an index + * build has been initialized but has not yet begun the collection scan phase. + * + * @tags: [ + * requires_persistence, + * requires_replication, + * ] + */ +(function() { +"use strict"; + +load("jstests/noPassthrough/libs/index_build.js"); + +const dbName = "test"; +const failPointName = "hangIndexBuildBeforeWaitingUntilMajorityOpTime"; + +const rst = new ReplSetTest({nodes: 1}); +rst.startSet(); +rst.initiate(); + +const coll = rst.getPrimary().getDB(dbName).getCollection(jsTestName()); +assert.commandWorked(coll.insert({a: 1})); + +ResumableIndexBuildTest.run(rst, dbName, coll.getName(), {a: 1}, failPointName, {}); + +rst.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/replsets/libs/rollback_resumable_index_build.js b/jstests/replsets/libs/rollback_resumable_index_build.js index a1ff7e4e65f..0ab2148e783 100644 --- a/jstests/replsets/libs/rollback_resumable_index_build.js +++ b/jstests/replsets/libs/rollback_resumable_index_build.js @@ -7,9 +7,8 @@ const RollbackResumableIndexBuildTest = class { * rollback starts is specified by rollbackStartFailPointName. The phase that the index build * will resume from after rollback completes is specified by rollbackEndFailPointName. If * either of these points is in the drain writes phase, documents to insert into the side - * writes table must be specified by insertIntoSideWritesTable. Documents specified by - * insertsToBeRolledBack are inserted after transitioning to rollback operations and will be - * rolled back. + * writes table must be specified by sideWrites. Documents specified by insertsToBeRolledBack + * are inserted after transitioning to rollback operations and will be rolled back. */ static run(rollbackTest, dbName, @@ -20,7 +19,7 @@ const RollbackResumableIndexBuildTest = class { rollbackEndFailPointName, rollbackEndFailPointData, insertsToBeRolledBack, - insertIntoSideWritesTable) { + sideWrites = []) { const originalPrimary = rollbackTest.getPrimary(); if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(originalPrimary)) { @@ -38,15 +37,15 @@ const RollbackResumableIndexBuildTest = class { const rollbackStartFp = configureFailPoint( originalPrimary, rollbackStartFailPointName, rollbackStartFailPointData); - const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable( - originalPrimary, collName, insertIntoSideWritesTable); - - const awaitCreateIndex = - ResumableIndexBuildTest.createIndex(originalPrimary, collName, indexSpec, indexName); + const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites( + rollbackTest, function(collName, indexSpec, indexName) { + assert.commandFailedWithCode( + db.getCollection(collName).createIndex(indexSpec, {name: indexName}), + ErrorCodes.InterruptedDueToReplStateChange); + }, coll, indexSpec, indexName, sideWrites); // Wait until we've completed the last operation that won't be rolled back so that we can // begin the operations that will be rolled back. - awaitInsertIntoSideWritesTable(); rollbackEndFp.wait(); const buildUUID = extractUUIDFromObject( @@ -120,9 +119,9 @@ const RollbackResumableIndexBuildTest = class { * Runs the resumable index build rollback test in the case where rollback begins after the * index build has already completed. The point during the index build to roll back to is * specified by rollbackEndFailPointName. If this point is in the drain writes phase, documents - * to insert into the side writes table must be specified by insertIntoSideWritesTable. - * Documents specified by insertsToBeRolledBack are inserted after transitioning to rollback - * operations and will be rolled back. + * to insert into the side writes table must be specified by sideWrites. Documents specified by + * insertsToBeRolledBack are inserted after transitioning to rollback operations and will be + * rolled back. */ static runIndexBuildComplete(rollbackTest, dbName, @@ -131,7 +130,7 @@ const RollbackResumableIndexBuildTest = class { rollbackEndFailPointName, rollbackEndFailPointData, insertsToBeRolledBack, - insertIntoSideWritesTable) { + sideWrites = []) { const originalPrimary = rollbackTest.getPrimary(); if (!ResumableIndexBuildTest.resumableIndexBuildsEnabled(originalPrimary)) { @@ -147,23 +146,19 @@ const RollbackResumableIndexBuildTest = class { const rollbackEndFp = configureFailPoint(originalPrimary, rollbackEndFailPointName, rollbackEndFailPointData); - const awaitInsertIntoSideWritesTable = ResumableIndexBuildTest.insertIntoSideWritesTable( - originalPrimary, collName, insertIntoSideWritesTable); - - const awaitCreateIndex = - startParallelShell(funWithArgs(function(collName, indexSpec, indexName) { - assert.commandWorked(db.runCommand({ - createIndexes: collName, - indexes: [{key: indexSpec, name: indexName}], - // Commit quorum is disabled so that the index build can - // complete while the primary is isolated and will roll back. - commitQuorum: 0 - })); - }, collName, indexSpec, indexName), originalPrimary.port); + const awaitCreateIndex = ResumableIndexBuildTest.createIndexWithSideWrites( + rollbackTest, function(collName, indexSpec, indexName) { + assert.commandWorked(db.runCommand({ + createIndexes: collName, + indexes: [{key: indexSpec, name: indexName}], + // Commit quorum is disabled so that the index build can + // complete while the primary is isolated and will roll back. + commitQuorum: 0 + })); + }, coll, indexSpec, indexName, sideWrites); // Wait until we reach the desired ending point so that we can begin the operations that // will be rolled back. - awaitInsertIntoSideWritesTable(); rollbackEndFp.wait(); const buildUUID = extractUUIDFromObject( diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 9b0d5f5b223..e73633c6d65 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -548,7 +548,8 @@ public: virtual std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor( OperationContext* opCtx, PlanYieldPolicy::YieldPolicy yieldPolicy, - ScanDirection scanDirection) const = 0; + ScanDirection scanDirection, + boost::optional<RecordId> resumeAfterRecordId = boost::none) const = 0; virtual void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) = 0; diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 3aeebc627b5..5b8b8a0b1e7 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -1209,10 +1209,12 @@ StatusWith<std::vector<BSONObj>> CollectionImpl::addCollationDefaultsToIndexSpec std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> CollectionImpl::makePlanExecutor( OperationContext* opCtx, PlanYieldPolicy::YieldPolicy yieldPolicy, - ScanDirection scanDirection) const { + ScanDirection scanDirection, + boost::optional<RecordId> resumeAfterRecordId) const { auto isForward = scanDirection == ScanDirection::kForward; auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD; - return InternalPlanner::collectionScan(opCtx, _ns.ns(), this, yieldPolicy, direction); + return InternalPlanner::collectionScan( + opCtx, _ns.ns(), this, yieldPolicy, direction, resumeAfterRecordId); } void CollectionImpl::setNs(NamespaceString nss) { diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 573cda25612..d3ab6380954 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -353,7 +353,8 @@ public: std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor( OperationContext* opCtx, PlanYieldPolicy::YieldPolicy yieldPolicy, - ScanDirection scanDirection) const final; + ScanDirection scanDirection, + boost::optional<RecordId> resumeAfterRecordId) const final; void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) final; diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index e07a110f8ef..54e2ab7b76f 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -274,7 +274,8 @@ public: std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor( OperationContext* opCtx, PlanYieldPolicy::YieldPolicy yieldPolicy, - ScanDirection scanDirection) const { + ScanDirection scanDirection, + boost::optional<RecordId> resumeAfterRecordId) const { std::abort(); } diff --git a/src/mongo/db/catalog/index_builds_manager.cpp b/src/mongo/db/catalog/index_builds_manager.cpp index b8298ca598a..bfa9dd3bcf2 100644 --- a/src/mongo/db/catalog/index_builds_manager.cpp +++ b/src/mongo/db/catalog/index_builds_manager.cpp @@ -122,10 +122,11 @@ Status IndexBuildsManager::setUpIndexBuild(OperationContext* opCtx, Status IndexBuildsManager::startBuildingIndex(OperationContext* opCtx, Collection* collection, - const UUID& buildUUID) { + const UUID& buildUUID, + boost::optional<RecordId> resumeAfterRecordId) { auto builder = invariant(_getBuilder(buildUUID)); - return builder->insertAllDocumentsInCollection(opCtx, collection); + return builder->insertAllDocumentsInCollection(opCtx, collection, resumeAfterRecordId); } Status IndexBuildsManager::resumeBuildingIndexFromBulkLoadPhase(OperationContext* opCtx, diff --git a/src/mongo/db/catalog/index_builds_manager.h b/src/mongo/db/catalog/index_builds_manager.h index 3980e390543..8c61dc72069 100644 --- a/src/mongo/db/catalog/index_builds_manager.h +++ b/src/mongo/db/catalog/index_builds_manager.h @@ -97,7 +97,8 @@ public: */ Status startBuildingIndex(OperationContext* opCtx, Collection* collection, - const UUID& buildUUID); + const UUID& buildUUID, + boost::optional<RecordId> resumeAfterRecordId = boost::none); Status resumeBuildingIndexFromBulkLoadPhase(OperationContext* opCtx, const UUID& buildUUID); diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index 1b7f3f57ca4..8cd8b6d6222 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -74,32 +74,39 @@ MONGO_FAIL_POINT_DEFINE(leaveIndexBuildUnfinishedForShutdown); namespace { -void failPointHangDuringBuild(OperationContext* opCtx, - FailPoint* fp, - StringData where, - const BSONObj& doc, - unsigned long long iteration) { - fp->executeIf( - [=, &doc](const BSONObj& data) { - LOGV2(20386, - "Hanging index build during collection scan phase insertion", - "where"_attr = where, - "doc"_attr = doc); - - fp->pauseWhileSet(opCtx); - }, - [&doc, iteration](const BSONObj& data) { - if (data.hasField("iteration")) { - return iteration == static_cast<unsigned long long>(data["iteration"].numberLong()); - } +Status failPointHangDuringBuild(OperationContext* opCtx, + FailPoint* fp, + StringData where, + const BSONObj& doc, + unsigned long long iteration) { + try { + fp->executeIf( + [=, &doc](const BSONObj& data) { + LOGV2(20386, + "Hanging index build during collection scan phase insertion", + "where"_attr = where, + "doc"_attr = doc); + + fp->pauseWhileSet(opCtx); + }, + [&doc, iteration](const BSONObj& data) { + if (data.hasField("iteration")) { + return iteration == + static_cast<unsigned long long>(data["iteration"].numberLong()); + } - auto fieldsToMatch = data.getObjectField("fieldsToMatch"); - return std::all_of( - fieldsToMatch.begin(), fieldsToMatch.end(), [&doc](const auto& elem) { - return SimpleBSONElementComparator::kInstance.evaluate(elem == - doc[elem.fieldName()]); - }); - }); + auto fieldsToMatch = data.getObjectField("fieldsToMatch"); + return std::all_of( + fieldsToMatch.begin(), fieldsToMatch.end(), [&doc](const auto& elem) { + return SimpleBSONElementComparator::kInstance.evaluate( + elem == doc[elem.fieldName()]); + }); + }); + } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + return exceptionToStatus(); + } + + return Status::OK(); } } // namespace @@ -204,10 +211,7 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init( invariant(_indexes.empty()); - // TODO (SERVER-49409): Resume from the collection scan phase. - if (resumeInfo && - (resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad || - resumeInfo->getPhase() == IndexBuildPhaseEnum::kDrainWrites)) { + if (resumeInfo) { _phase = resumeInfo->getPhase(); } @@ -311,8 +315,9 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init( index.bulk = index.real->initiateBulk( eachIndexBuildMaxMemoryUsageBytes, - // TODO (SERVER-49409): Resume from the collection scan phase. - resumeInfo && resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad + // When resuming from the drain writes phase, there is no Sorter state to + // reconstruct. + resumeInfo && resumeInfo->getPhase() != IndexBuildPhaseEnum::kDrainWrites ? sorterInfo : boost::none); @@ -387,8 +392,10 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlock::init( } } -Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx, - Collection* collection) { +Status MultiIndexBlock::insertAllDocumentsInCollection( + OperationContext* opCtx, + Collection* collection, + boost::optional<RecordId> resumeAfterRecordId) { invariant(!_buildIsCleanedUp); invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork()); @@ -447,15 +454,18 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx, } else { yieldPolicy = PlanYieldPolicy::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY; } - auto exec = - collection->makePlanExecutor(opCtx, yieldPolicy, Collection::ScanDirection::kForward); + auto exec = collection->makePlanExecutor( + opCtx, yieldPolicy, Collection::ScanDirection::kForward, resumeAfterRecordId); // Hint to the storage engine that this collection scan should not keep data in the cache. bool readOnce = useReadOnceCursorsForIndexBuilds.load(); opCtx->recoveryUnit()->setReadOnce(readOnce); try { - invariant(_phase == IndexBuildPhaseEnum::kInitialized, + // The phase will be kCollectionScan when resuming an index build from the collection scan + // phase. + invariant(_phase == IndexBuildPhaseEnum::kInitialized || + _phase == IndexBuildPhaseEnum::kCollectionScan, IndexBuildPhase_serializer(_phase).toString()); _phase = IndexBuildPhaseEnum::kCollectionScan; @@ -466,7 +476,7 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx, MONGO_unlikely(hangAfterStartingIndexBuild.shouldFail())) { auto interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) - return opCtx->checkForInterruptNoAssert(); + return interruptStatus; if (PlanExecutor::ADVANCED != state) { continue; @@ -474,11 +484,14 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx, progress->setTotalWhileRunning(collection->numRecords(opCtx)); - failPointHangDuringBuild(opCtx, - &hangIndexBuildDuringCollectionScanPhaseBeforeInsertion, - "before", - objToIndex, - n); + interruptStatus = + failPointHangDuringBuild(opCtx, + &hangIndexBuildDuringCollectionScanPhaseBeforeInsertion, + "before", + objToIndex, + n); + if (!interruptStatus.isOK()) + return interruptStatus; // The external sorter is not part of the storage engine and therefore does not need a // WriteUnitOfWork to write keys. @@ -491,7 +504,8 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(OperationContext* opCtx, &hangIndexBuildDuringCollectionScanPhaseAfterInsertion, "after", objToIndex, - n); + n) + .ignore(); // Go to the next document. progress->hit(); @@ -891,10 +905,12 @@ BSONObj MultiIndexBlock::_constructStateObject() const { BSONObjBuilder indexInfo(indexesArray.subobjStart()); - if (_phase == IndexBuildPhaseEnum::kCollectionScan || - _phase == IndexBuildPhaseEnum::kBulkLoad) { + if (_phase != IndexBuildPhaseEnum::kDrainWrites) { auto state = index.bulk->getPersistedSorterState(); + // TODO (SERVER-50289): Consider not including tempDir in the persisted resumable index + // build state. + indexInfo.append("tempDir", state.tempDir); indexInfo.append("fileName", state.fileName); indexInfo.append("numKeys", index.bulk->getKeysInserted()); diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h index e375fffd094..5e52419dd1c 100644 --- a/src/mongo/db/catalog/multi_index_block.h +++ b/src/mongo/db/catalog/multi_index_block.h @@ -149,7 +149,10 @@ public: * * Should not be called inside of a WriteUnitOfWork. */ - Status insertAllDocumentsInCollection(OperationContext* opCtx, Collection* collection); + Status insertAllDocumentsInCollection( + OperationContext* opCtx, + Collection* collection, + boost::optional<RecordId> resumeAfterRecordId = boost::none); /** * Call this after init() for each document in the collection. diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 2489279cb76..7cd46f8093b 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -506,12 +506,14 @@ AbstractIndexAccessMethod::BulkBuilderImpl::BulkBuilderImpl(IndexCatalogEntry* i size_t maxMemoryUsageBytes, const IndexSorterInfo& sorterInfo) : _indexCatalogEntry(index), - _sorter(Sorter::makeFromExistingRanges( - sorterInfo.getFileName()->toString(), - *sorterInfo.getRanges(), - SortOptions().ExtSortAllowed().MaxMemoryUsageBytes(maxMemoryUsageBytes), - BtreeExternalSortComparison(), - _makeSorterSettings())), + _sorter(Sorter::makeFromExistingRanges(sorterInfo.getFileName()->toString(), + *sorterInfo.getRanges(), + SortOptions() + .TempDir(sorterInfo.getTempDir()->toString()) + .ExtSortAllowed() + .MaxMemoryUsageBytes(maxMemoryUsageBytes), + BtreeExternalSortComparison(), + _makeSorterSettings())), _keysInserted(*sorterInfo.getNumKeys()) {} Status AbstractIndexAccessMethod::BulkBuilderImpl::insert(OperationContext* opCtx, diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index cd4b017a3f0..0b0bae768b3 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -78,6 +78,8 @@ MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeAbortCleanUp); MONGO_FAIL_POINT_DEFINE(hangIndexBuildOnStepUp); MONGO_FAIL_POINT_DEFINE(hangAfterSettingUpResumableIndexBuild); MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeCommit); +MONGO_FAIL_POINT_DEFINE(hangBeforeBuildingIndex); +MONGO_FAIL_POINT_DEFINE(hangIndexBuildBeforeWaitingUntilMajorityOpTime); namespace { @@ -2262,10 +2264,7 @@ void IndexBuildsCoordinator::_runIndexBuildInner( hangAfterInitializingIndexBuild.pauseWhileSet(opCtx); } - // TODO (SERVER-49409): Resume from the collection scan phase. - if (resumeInfo && - (resumeInfo->getPhase() == IndexBuildPhaseEnum::kBulkLoad || - resumeInfo->getPhase() == IndexBuildPhaseEnum::kDrainWrites)) { + if (resumeInfo) { _resumeIndexBuildFromPhase(opCtx, replState, indexBuildOptions, resumeInfo.get()); } else { _buildIndex(opCtx, replState, indexBuildOptions); @@ -2339,8 +2338,17 @@ void IndexBuildsCoordinator::_resumeIndexBuildFromPhase( hangAfterSettingUpResumableIndexBuild.pauseWhileSet(); } - if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kBulkLoad) + if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kInitialized || + resumeInfo.getPhase() == IndexBuildPhaseEnum::kCollectionScan) { + _scanCollectionAndInsertSortedKeysIntoIndex( + opCtx, + replState, + resumeInfo.getCollectionScanPosition() + ? boost::make_optional<RecordId>(RecordId(*resumeInfo.getCollectionScanPosition())) + : boost::none); + } else if (resumeInfo.getPhase() == IndexBuildPhaseEnum::kBulkLoad) { _insertSortedKeysIntoIndexForResume(opCtx, replState); + } _insertKeysFromSideTablesWithoutBlockingWrites(opCtx, replState); _signalPrimaryForCommitReadiness(opCtx, replState); @@ -2364,6 +2372,13 @@ void IndexBuildsCoordinator::_awaitLastOpTimeBeforeInterceptorsMajorityCommitted "buildUUID"_attr = replState->buildUUID, "lastOpTime"_attr = replState->lastOpTimeBeforeInterceptors); + if (MONGO_unlikely(hangIndexBuildBeforeWaitingUntilMajorityOpTime.shouldFail())) { + LOGV2(4940901, + "Hanging index build before waiting for the last optime before interceptors to be " + "majority committed due to hangIndexBuildBeforeWaitingUntilMajorityOpTime failpoint"); + hangIndexBuildBeforeWaitingUntilMajorityOpTime.pauseWhileSet(opCtx); + } + auto status = replCoord->waitUntilMajorityOpTime(opCtx, replState->lastOpTimeBeforeInterceptors); uassertStatusOK(status); @@ -2377,6 +2392,11 @@ void IndexBuildsCoordinator::_awaitLastOpTimeBeforeInterceptorsMajorityCommitted void IndexBuildsCoordinator::_buildIndex(OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState, const IndexBuildOptions& indexBuildOptions) { + if (MONGO_unlikely(hangBeforeBuildingIndex.shouldFail())) { + LOGV2(4940900, "Hanging before building index due to hangBeforeBuildingIndex failpoint"); + hangBeforeBuildingIndex.pauseWhileSet(); + } + // Read without a timestamp. When we commit, we block writes which guarantees all writes are // visible. opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); @@ -2394,7 +2414,9 @@ void IndexBuildsCoordinator::_buildIndex(OperationContext* opCtx, * Second phase is extracting the sorted keys and writing them into the new index table. */ void IndexBuildsCoordinator::_scanCollectionAndInsertSortedKeysIntoIndex( - OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState) { + OperationContext* opCtx, + std::shared_ptr<ReplIndexBuildState> replState, + boost::optional<RecordId> resumeAfterRecordId) { // Collection scan and insert into index. { @@ -2418,8 +2440,8 @@ void IndexBuildsCoordinator::_scanCollectionAndInsertSortedKeysIntoIndex( CollectionCatalog::get(opCtx).lookupCollectionByUUID(opCtx, replState->collectionUUID); invariant(collection); - uassertStatusOK( - _indexBuildsManager.startBuildingIndex(opCtx, collection, replState->buildUUID)); + uassertStatusOK(_indexBuildsManager.startBuildingIndex( + opCtx, collection, replState->buildUUID, resumeAfterRecordId)); } if (MONGO_unlikely(hangAfterIndexBuildDumpsInsertsFromBulk.shouldFail())) { diff --git a/src/mongo/db/index_builds_coordinator.h b/src/mongo/db/index_builds_coordinator.h index bbddc12f2dc..ead5e5e39b1 100644 --- a/src/mongo/db/index_builds_coordinator.h +++ b/src/mongo/db/index_builds_coordinator.h @@ -653,7 +653,9 @@ protected: * Second phase is extracting the sorted keys and writing them into the new index table. */ void _scanCollectionAndInsertSortedKeysIntoIndex( - OperationContext* opCtx, std::shared_ptr<ReplIndexBuildState> replState); + OperationContext* opCtx, + std::shared_ptr<ReplIndexBuildState> replState, + boost::optional<RecordId> resumeAfterRecordId = boost::none); /** * Performs the second phase of the index build, for use when resuming from the second phase. */ diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 21a76b11120..264a467492f 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -53,7 +53,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection StringData ns, const Collection* collection, PlanYieldPolicy::YieldPolicy yieldPolicy, - const Direction direction) { + const Direction direction, + boost::optional<RecordId> resumeAfterRecordId) { std::unique_ptr<WorkingSet> ws = std::make_unique<WorkingSet>(); auto expCtx = make_intrusive<ExpressionContext>( @@ -70,7 +71,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection invariant(ns == collection->ns().ns()); - auto cs = _collectionScan(expCtx, ws.get(), collection, direction); + auto cs = _collectionScan(expCtx, ws.get(), collection, direction, resumeAfterRecordId); // Takes ownership of 'ws' and 'cs'. auto statusWithPlanExecutor = @@ -201,12 +202,14 @@ std::unique_ptr<PlanStage> InternalPlanner::_collectionScan( const boost::intrusive_ptr<ExpressionContext>& expCtx, WorkingSet* ws, const Collection* collection, - Direction direction) { + Direction direction, + boost::optional<RecordId> resumeAfterRecordId) { invariant(collection); CollectionScanParams params; params.shouldWaitForOplogVisibility = shouldWaitForOplogVisibility(expCtx->opCtx, collection, false); + params.resumeAfterRecordId = resumeAfterRecordId; if (FORWARD == direction) { params.direction = CollectionScanParams::FORWARD; diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index c6bfc9b5dc4..0dc87bc96cd 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -74,7 +74,8 @@ public: StringData ns, const Collection* collection, PlanYieldPolicy::YieldPolicy yieldPolicy, - const Direction direction = FORWARD); + const Direction direction = FORWARD, + boost::optional<RecordId> resumeAfterRecordId = boost::none); /** * Returns a FETCH => DELETE plan. @@ -135,7 +136,8 @@ private: const boost::intrusive_ptr<ExpressionContext>& expCtx, WorkingSet* ws, const Collection* collection, - Direction direction); + Direction direction, + boost::optional<RecordId> resumeAfterRecordId = boost::none); /** * Returns a plan stage that is either an index scan or an index scan with a fetch stage. diff --git a/src/mongo/db/resumable_index_builds.idl b/src/mongo/db/resumable_index_builds.idl index bbe2f80dcd4..74a53553100 100644 --- a/src/mongo/db/resumable_index_builds.idl +++ b/src/mongo/db/resumable_index_builds.idl @@ -68,6 +68,10 @@ structs: for this index build" type: string optional: true + tempDir: + description: "The directory into which we place a file when spilling data to disk" + type: string + optional: true fileName: description: "The name of the file that sorted data is written to" type: string diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index 099e8b25e5e..514578b974d 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -539,11 +539,8 @@ public: NoLimitSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : _comp(comp), _settings(settings), _opts(opts) { - verify(_opts.limit == 0); - if (_opts.extSortAllowed) { - this->_fileName = _opts.tempDir + "/" + nextFileName(); - } + : Sorter<Key, Value>(opts), _comp(comp), _settings(settings) { + invariant(opts.limit == 0); } NoLimitSorter(const std::string& fileName, @@ -551,14 +548,13 @@ public: const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : _comp(comp), + : Sorter<Key, Value>(opts, fileName), + _comp(comp), _settings(settings), - _opts(opts), - _nextSortedFileWriterOffset(ranges.back().getEndOffset()) { - invariant(_opts.extSortAllowed); + _nextSortedFileWriterOffset(!ranges.empty() ? ranges.back().getEndOffset() : 0) { + invariant(opts.extSortAllowed); this->_usedDisk = true; - this->_fileName = fileName; std::transform(ranges.begin(), ranges.end(), @@ -589,7 +585,7 @@ public: _memUsed += key.memUsageForSorter(); _memUsed += val.memUsageForSorter(); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); } @@ -601,7 +597,7 @@ public: _data->emplace_back(std::move(key), std::move(val)); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); } @@ -614,7 +610,7 @@ public: } spill(); - Iterator* mergeIt = Iterator::merge(this->_iters, this->_fileName, _opts, _comp); + Iterator* mergeIt = Iterator::merge(this->_iters, this->_fileName, this->_opts, _comp); _done = true; return mergeIt; } @@ -648,20 +644,21 @@ private: if (_data->empty()) return; - if (!_opts.extSortAllowed) { + if (!this->_opts.extSortAllowed) { // This error message only applies to sorts from user queries made through the find or // aggregation commands. Other clients, such as bulk index builds, should suppress this // error, either by allowing external sorting or by catching and throwing a more // appropriate error. uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - str::stream() << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes - << " bytes, but did not opt in to external sorting."); + str::stream() + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes + << " bytes, but did not opt in to external sorting."); } sort(); SortedFileWriter<Key, Value> writer( - _opts, this->_fileName, _nextSortedFileWriterOffset, _settings); + this->_opts, this->_fileName, _nextSortedFileWriterOffset, _settings); for (; !_data->empty(); _data->pop_front()) { writer.addAlreadySorted(_data->front().first, _data->front().second); } @@ -675,7 +672,6 @@ private: const Comparator _comp; const Settings _settings; - SortOptions _opts; std::streampos _nextSortedFileWriterOffset = 0; bool _done = false; size_t _memUsed = 0; @@ -741,19 +737,15 @@ public: TopKSorter(const SortOptions& opts, const Comparator& comp, const Settings& settings = Settings()) - : _comp(comp), + : Sorter<Key, Value>(opts), + _comp(comp), _settings(settings), - _opts(opts), _memUsed(0), _haveCutoff(false), _worstCount(0), _medianCount(0) { // This also *works* with limit==1 but LimitOneSorter should be used instead - verify(_opts.limit > 1); - - if (_opts.extSortAllowed) { - this->_fileName = _opts.tempDir + "/" + nextFileName(); - } + invariant(opts.limit > 1); // Preallocate a fixed sized vector of the required size if we don't expect it to have a // major impact on our memory budget. This is the common case with small limits. @@ -778,7 +770,7 @@ public: STLComparator less(_comp); Data contender(key, val); - if (_data.size() < _opts.limit) { + if (_data.size() < this->_opts.limit) { if (_haveCutoff && !less(contender, _cutoff)) return; @@ -787,16 +779,16 @@ public: _memUsed += key.memUsageForSorter(); _memUsed += val.memUsageForSorter(); - if (_data.size() == _opts.limit) + if (_data.size() == this->_opts.limit) std::make_heap(_data.begin(), _data.end(), less); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); return; } - verify(_data.size() == _opts.limit); + invariant(_data.size() == this->_opts.limit); if (!less(contender, _data.front())) return; // not good enough @@ -813,7 +805,7 @@ public: _data.back() = {contender.first.getOwned(), contender.second.getOwned()}; std::push_heap(_data.begin(), _data.end(), less); - if (_memUsed > _opts.maxMemoryUsageBytes) + if (_memUsed > this->_opts.maxMemoryUsageBytes) spill(); } @@ -824,7 +816,7 @@ public: } spill(); - Iterator* iterator = Iterator::merge(this->_iters, this->_fileName, _opts, _comp); + Iterator* iterator = Iterator::merge(this->_iters, this->_fileName, this->_opts, _comp); _done = true; return iterator; } @@ -845,7 +837,7 @@ private: void sort() { STLComparator less(_comp); - if (_data.size() == _opts.limit) { + if (_data.size() == this->_opts.limit) { std::sort_heap(_data.begin(), _data.end(), less); } else { std::stable_sort(_data.begin(), _data.end(), less); @@ -914,14 +906,14 @@ private: // Promote _worstSeen or _lastMedian to _cutoff and reset counters if should. - if (_worstCount >= _opts.limit) { + if (_worstCount >= this->_opts.limit) { if (!_haveCutoff || less(_worstSeen, _cutoff)) { _cutoff = _worstSeen; _haveCutoff = true; } _worstCount = 0; } - if (_medianCount >= _opts.limit) { + if (_medianCount >= this->_opts.limit) { if (!_haveCutoff || less(_lastMedian, _cutoff)) { _cutoff = _lastMedian; _haveCutoff = true; @@ -937,13 +929,13 @@ private: if (_data.empty()) return; - if (!_opts.extSortAllowed) { + if (!this->_opts.extSortAllowed) { // This error message only applies to sorts from user queries made through the find or // aggregation commands. Other clients should suppress this error, either by allowing // external sorting or by catching and throwing a more appropriate error. uasserted(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, str::stream() - << "Sort exceeded memory limit of " << _opts.maxMemoryUsageBytes + << "Sort exceeded memory limit of " << this->_opts.maxMemoryUsageBytes << " bytes, but did not opt in to external sorting. Aborting operation." << " Pass allowDiskUse:true to opt in."); } @@ -955,7 +947,7 @@ private: updateCutoff(); SortedFileWriter<Key, Value> writer( - _opts, this->_fileName, _nextSortedFileWriterOffset, _settings); + this->_opts, this->_fileName, _nextSortedFileWriterOffset, _settings); for (size_t i = 0; i < _data.size(); i++) { writer.addAlreadySorted(_data[i].first, _data[i].second); } @@ -972,7 +964,6 @@ private: const Comparator _comp; const Settings _settings; - SortOptions _opts; std::streampos _nextSortedFileWriterOffset = 0; bool _done = false; size_t _memUsed; @@ -992,6 +983,10 @@ private: } // namespace sorter template <typename Key, typename Value> +Sorter<Key, Value>::Sorter(const SortOptions& opts) + : Sorter(opts, opts.extSortAllowed ? opts.tempDir + "/" + nextFileName() : "") {} + +template <typename Key, typename Value> std::vector<SorterRange> Sorter<Key, Value>::_getRanges() const { std::vector<SorterRange> ranges; ranges.reserve(_iters.size()); @@ -1185,8 +1180,6 @@ Sorter<Key, Value>* Sorter<Key, Value>::makeFromExistingRanges( "NoLimitSorter (limit 0), but got limit " << opts.limit); - invariant(!ranges.empty()); - return new sorter::NoLimitSorter<Key, Value, Comparator>( fileName, ranges, opts, comp, settings); } diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 1debcf5e318..d301c4c6205 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -105,7 +105,7 @@ struct SortOptions { bool extSortAllowed; // Directory into which we place a file when spilling to disk. Must be explicitly set if - // extSortAllowed is true, unless constructing a Sorter from an existing file and ranges. + // extSortAllowed is true. std::string tempDir; SortOptions() : limit(0), maxMemoryUsageBytes(64 * 1024 * 1024), extSortAllowed(false) {} @@ -223,10 +223,16 @@ public: Settings; struct PersistedState { + std::string tempDir; std::string fileName; std::vector<SorterRange> ranges; }; + Sorter(const SortOptions& opts); + + Sorter(const SortOptions& opts, const std::string& fileName) + : _opts(opts), _fileName(fileName) {} + template <typename Comparator> static Sorter* make(const SortOptions& opts, const Comparator& comp, @@ -257,7 +263,7 @@ public: } PersistedState getPersistedState() const { - return {_fileName, _getRanges()}; + return {_opts.tempDir, _fileName, _getRanges()}; } void persistDataForShutdown(); @@ -274,6 +280,7 @@ protected: // Whether the files written by this Sorter should be kept on destruction. bool _shouldKeepFilesOnDestruction = false; + SortOptions _opts; std::string _fileName; std::vector<std::shared_ptr<Iterator>> _iters; // Data that has already been spilled. diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index 2b2d2d77992..4de5eb031f8 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -482,6 +482,7 @@ private: void assertRangeInfo(unowned_ptr<IWSorter> sorter, const SortOptions& opts) { auto state = sorter->getPersistedState(); if (opts.extSortAllowed) { + ASSERT_EQ(state.tempDir, opts.tempDir); ASSERT_STRING_CONTAINS(state.fileName, opts.tempDir); } if (auto numRanges = correctNumRanges()) { |