diff options
Diffstat (limited to 'src/mongo')
107 files changed, 962 insertions, 1010 deletions
diff --git a/src/mongo/db/auth/auth_op_observer_test.cpp b/src/mongo/db/auth/auth_op_observer_test.cpp index 4912b71a133..d59a98e6412 100644 --- a/src/mongo/db/auth/auth_op_observer_test.cpp +++ b/src/mongo/db/auth/auth_op_observer_test.cpp @@ -77,7 +77,7 @@ public: ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); // Create test collection - writeConflictRetry(opCtx.get(), "createColl", _nss.ns(), [&] { + writeConflictRetry(opCtx.get(), "createColl", _nss, [&] { opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); opCtx->recoveryUnit()->abandonSnapshot(); diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index e1cb1135852..5d264e189c0 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -849,7 +849,7 @@ Status _collModInternal(OperationContext* opCtx, LOGV2(5324200, "CMD: collMod", "cmdObj"_attr = cmd.toBSON(BSONObj())); } - return writeConflictRetry(opCtx, "collMod", nss.ns(), [&] { + return writeConflictRetry(opCtx, "collMod", nss, [&] { WriteUnitOfWork wunit(opCtx); // Handle collMod on a view and return early. The CollectionCatalog handles the creation of diff --git a/src/mongo/db/catalog/column_index_consistency.cpp b/src/mongo/db/catalog/column_index_consistency.cpp index 768d328436d..119d75d42fd 100644 --- a/src/mongo/db/catalog/column_index_consistency.cpp +++ b/src/mongo/db/catalog/column_index_consistency.cpp @@ -226,7 +226,7 @@ void ColumnIndexConsistency::repairIndexEntries(OperationContext* opCtx, ColumnStoreAccessMethod* csam = checked_cast<ColumnStoreAccessMethod*>(index->accessMethod()); - writeConflictRetry(opCtx, "removingExtraColumnIndexEntries", _validateState->nss().ns(), [&] { + writeConflictRetry(opCtx, "removingExtraColumnIndexEntries", _validateState->nss(), [&] { WriteUnitOfWork wunit(opCtx); auto& indexResults = results->indexResultsMap[csam->indexName()]; auto cursor = csam->writableStorage()->newWriteCursor(opCtx); diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index 936786b1e75..84103a962a4 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -163,7 +163,7 @@ Status _createView(OperationContext* opCtx, << "': this is a reserved system namespace", !nss.isSystemDotViews()); - return writeConflictRetry(opCtx, "create", nss.ns(), [&] { + return writeConflictRetry(opCtx, "create", nss, [&] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); // Operations all lock system.views in the end to prevent deadlock. @@ -362,104 +362,101 @@ Status _createTimeseries(OperationContext* opCtx, bool existingBucketCollectionIsCompatible = false; - Status ret = - writeConflictRetry(opCtx, "createBucketCollection", bucketsNs.ns(), [&]() -> Status { - AutoGetDb autoDb(opCtx, bucketsNs.dbName(), MODE_IX); - Lock::CollectionLock bucketsCollLock(opCtx, bucketsNs, MODE_X); - auto db = autoDb.ensureDbExists(opCtx); - - // Check if there already exist a Collection on the namespace we will later create a - // view on. We're not holding a Collection lock for this Collection so we may only check - // if the pointer is null or not. The answer may also change at any point after this - // call which is fine as we properly handle an orphaned bucket collection. This check is - // just here to prevent it from being created in the common case. - Status status = catalog::checkIfNamespaceExists(opCtx, ns); - if (!status.isOK()) { - return status; - } + Status ret = writeConflictRetry(opCtx, "createBucketCollection", bucketsNs, [&]() -> Status { + AutoGetDb autoDb(opCtx, bucketsNs.dbName(), MODE_IX); + Lock::CollectionLock bucketsCollLock(opCtx, bucketsNs, MODE_X); + auto db = autoDb.ensureDbExists(opCtx); - if (opCtx->writesAreReplicated() && - !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, bucketsNs)) { - // Report the error with the user provided namespace - return Status(ErrorCodes::NotWritablePrimary, - str::stream() << "Not primary while creating collection " - << ns.toStringForErrorMsg()); - } + // Check if there already exist a Collection on the namespace we will later create a + // view on. We're not holding a Collection lock for this Collection so we may only check + // if the pointer is null or not. The answer may also change at any point after this + // call which is fine as we properly handle an orphaned bucket collection. This check is + // just here to prevent it from being created in the common case. + Status status = catalog::checkIfNamespaceExists(opCtx, ns); + if (!status.isOK()) { + return status; + } - CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, bucketsNs) - ->checkShardVersionOrThrow(opCtx); + if (opCtx->writesAreReplicated() && + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, bucketsNs)) { + // Report the error with the user provided namespace + return Status(ErrorCodes::NotWritablePrimary, + str::stream() << "Not primary while creating collection " + << ns.toStringForErrorMsg()); + } - WriteUnitOfWork wuow(opCtx); - AutoStatsTracker bucketsStatsTracker( - opCtx, - bucketsNs, - Top::LockType::NotLocked, - AutoStatsTracker::LogMode::kUpdateTopAndCurOp, - CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(ns.dbName())); - - // If the buckets collection and time-series view creation roll back, ensure that their - // Top entries are deleted. - opCtx->recoveryUnit()->onRollback( - [serviceContext = opCtx->getServiceContext(), bucketsNs](OperationContext*) { - Top::get(serviceContext).collectionDropped(bucketsNs); - }); + CollectionShardingState::assertCollectionLockedAndAcquire(opCtx, bucketsNs) + ->checkShardVersionOrThrow(opCtx); + + WriteUnitOfWork wuow(opCtx); + AutoStatsTracker bucketsStatsTracker( + opCtx, + bucketsNs, + Top::LockType::NotLocked, + AutoStatsTracker::LogMode::kUpdateTopAndCurOp, + CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(ns.dbName())); + + // If the buckets collection and time-series view creation roll back, ensure that their + // Top entries are deleted. + opCtx->recoveryUnit()->onRollback( + [serviceContext = opCtx->getServiceContext(), bucketsNs](OperationContext*) { + Top::get(serviceContext).collectionDropped(bucketsNs); + }); - // Prepare collection option and index spec using the provided options. In case the - // collection already exist we use these to validate that they are the same as being - // requested here. - CollectionOptions bucketsOptions = options; - bucketsOptions.validator = validatorObj; + // Prepare collection option and index spec using the provided options. In case the + // collection already exist we use these to validate that they are the same as being + // requested here. + CollectionOptions bucketsOptions = options; + bucketsOptions.validator = validatorObj; - // Cluster time-series buckets collections by _id. - auto expireAfterSeconds = options.expireAfterSeconds; - if (expireAfterSeconds) { - uassertStatusOK(index_key_validate::validateExpireAfterSeconds( - *expireAfterSeconds, - index_key_validate::ValidateExpireAfterSecondsMode::kClusteredTTLIndex)); - bucketsOptions.expireAfterSeconds = expireAfterSeconds; - } + // Cluster time-series buckets collections by _id. + auto expireAfterSeconds = options.expireAfterSeconds; + if (expireAfterSeconds) { + uassertStatusOK(index_key_validate::validateExpireAfterSeconds( + *expireAfterSeconds, + index_key_validate::ValidateExpireAfterSecondsMode::kClusteredTTLIndex)); + bucketsOptions.expireAfterSeconds = expireAfterSeconds; + } + + bucketsOptions.clusteredIndex = clustered_util::makeCanonicalClusteredInfoForLegacyFormat(); - bucketsOptions.clusteredIndex = - clustered_util::makeCanonicalClusteredInfoForLegacyFormat(); + if (auto coll = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, bucketsNs)) { + // Compare CollectionOptions and eventual TTL index to see if this bucket collection + // may be reused for this request. + existingBucketCollectionIsCompatible = + coll->getCollectionOptions().matchesStorageOptions( + bucketsOptions, CollatorFactoryInterface::get(opCtx->getServiceContext())); + + // We may have a bucket collection created with a previous version of mongod, this + // is also OK as we do not convert bucket collections to latest version during + // upgrade. + while (!existingBucketCollectionIsCompatible && + bucketVersion > timeseries::kTimeseriesControlMinVersion) { + validatorObj = _generateTimeseriesValidator(--bucketVersion, timeField); + bucketsOptions.validator = validatorObj; - if (auto coll = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, bucketsNs)) { - // Compare CollectionOptions and eventual TTL index to see if this bucket collection - // may be reused for this request. existingBucketCollectionIsCompatible = coll->getCollectionOptions().matchesStorageOptions( bucketsOptions, CollatorFactoryInterface::get(opCtx->getServiceContext())); - - // We may have a bucket collection created with a previous version of mongod, this - // is also OK as we do not convert bucket collections to latest version during - // upgrade. - while (!existingBucketCollectionIsCompatible && - bucketVersion > timeseries::kTimeseriesControlMinVersion) { - validatorObj = _generateTimeseriesValidator(--bucketVersion, timeField); - bucketsOptions.validator = validatorObj; - - existingBucketCollectionIsCompatible = - coll->getCollectionOptions().matchesStorageOptions( - bucketsOptions, - CollatorFactoryInterface::get(opCtx->getServiceContext())); - } - - return Status(ErrorCodes::NamespaceExists, - str::stream() - << "Bucket Collection already exists. NS: " - << bucketsNs.toStringForErrorMsg() << ". UUID: " << coll->uuid()); } - // Create the buckets collection that will back the view. - const bool createIdIndex = false; - uassertStatusOK(db->userCreateNS(opCtx, bucketsNs, bucketsOptions, createIdIndex)); + return Status(ErrorCodes::NamespaceExists, + str::stream() + << "Bucket Collection already exists. NS: " + << bucketsNs.toStringForErrorMsg() << ". UUID: " << coll->uuid()); + } - CollectionWriter collectionWriter(opCtx, bucketsNs); - uassertStatusOK(_createDefaultTimeseriesIndex(opCtx, collectionWriter)); - wuow.commit(); - return Status::OK(); - }); + // Create the buckets collection that will back the view. + const bool createIdIndex = false; + uassertStatusOK(db->userCreateNS(opCtx, bucketsNs, bucketsOptions, createIdIndex)); + + CollectionWriter collectionWriter(opCtx, bucketsNs); + uassertStatusOK(_createDefaultTimeseriesIndex(opCtx, collectionWriter)); + wuow.commit(); + return Status::OK(); + }); // If compatible bucket collection already exists then proceed with creating view defintion. // If the 'temp' flag is true, we are in the $out stage, and should return without creating the @@ -467,7 +464,7 @@ Status _createTimeseries(OperationContext* opCtx, if ((!ret.isOK() && !existingBucketCollectionIsCompatible) || options.temp) return ret; - ret = writeConflictRetry(opCtx, "create", ns.ns(), [&]() -> Status { + ret = writeConflictRetry(opCtx, "create", ns, [&]() -> Status { AutoGetCollection autoColl( opCtx, ns, @@ -552,7 +549,7 @@ Status _createCollection( const CollectionOptions& collectionOptions, const boost::optional<BSONObj>& idIndex, const boost::optional<VirtualCollectionOptions>& virtualCollectionOptions = boost::none) { - return writeConflictRetry(opCtx, "create", nss.ns(), [&] { + return writeConflictRetry(opCtx, "create", nss, [&] { // If a change collection is to be created, that is, the change streams are being enabled // for a tenant, acquire exclusive tenant lock. AutoGetDb autoDb(opCtx, @@ -835,7 +832,7 @@ Status createCollectionForApplyOps(OperationContext* opCtx, "conflictingUUID"_attr = uuid, "tempName"_attr = tmpName); Status status = - writeConflictRetry(opCtx, "createCollectionForApplyOps", newCollName.ns(), [&] { + writeConflictRetry(opCtx, "createCollectionForApplyOps", newCollName, [&] { WriteUnitOfWork wuow(opCtx); Status status = db->renameCollection(opCtx, newCollName, tmpName, stayTemp); if (!status.isOK()) @@ -886,7 +883,7 @@ Status createCollectionForApplyOps(OperationContext* opCtx, str::stream() << "Invalid name " << newCollName.toStringForErrorMsg() << " for UUID " << uuid, currentName->db() == newCollName.db()); - return writeConflictRetry(opCtx, "createCollectionForApplyOps", newCollName.ns(), [&] { + return writeConflictRetry(opCtx, "createCollectionForApplyOps", newCollName, [&] { WriteUnitOfWork wuow(opCtx); Status status = db->renameCollection(opCtx, *currentName, newCollName, stayTemp); if (!status.isOK()) diff --git a/src/mongo/db/catalog/create_collection_test.cpp b/src/mongo/db/catalog/create_collection_test.cpp index 91fe9834ee9..0de23289103 100644 --- a/src/mongo/db/catalog/create_collection_test.cpp +++ b/src/mongo/db/catalog/create_collection_test.cpp @@ -112,7 +112,7 @@ void CreateCollectionTest::validateValidator(const std::string& validatorStr, options.validator = fromjson(validatorStr); options.uuid = UUID::gen(); - return writeConflictRetry(opCtx.get(), "create", newNss.ns(), [&] { + return writeConflictRetry(opCtx.get(), "create", newNss, [&] { AutoGetCollection autoColl(opCtx.get(), newNss, MODE_IX); auto db = autoColl.ensureDbExists(opCtx.get()); ASSERT_TRUE(db) << "Cannot create collection " << newNss.toStringForErrorMsg() diff --git a/src/mongo/db/catalog/database_holder_impl.cpp b/src/mongo/db/catalog/database_holder_impl.cpp index 49ef7b48e1a..890f526e8ca 100644 --- a/src/mongo/db/catalog/database_holder_impl.cpp +++ b/src/mongo/db/catalog/database_holder_impl.cpp @@ -230,7 +230,7 @@ void DatabaseHolderImpl::dropDb(OperationContext* opCtx, Database* db) { }); auto const storageEngine = serviceContext->getStorageEngine(); - writeConflictRetry(opCtx, "dropDatabase", toStringForLogging(name), [&] { + writeConflictRetry(opCtx, "dropDatabase", NamespaceString(name), [&] { storageEngine->dropDatabase(opCtx, name).transitional_ignore(); }); } diff --git a/src/mongo/db/catalog/database_test.cpp b/src/mongo/db/catalog/database_test.cpp index f34e803b7b0..50fe7ee7aa1 100644 --- a/src/mongo/db/catalog/database_test.cpp +++ b/src/mongo/db/catalog/database_test.cpp @@ -114,7 +114,7 @@ void DatabaseTest::tearDown() { } TEST_F(DatabaseTest, SetDropPendingThrowsExceptionIfDatabaseIsAlreadyInADropPendingState) { - writeConflictRetry(_opCtx.get(), "testSetDropPending", _nss.ns(), [this] { + writeConflictRetry(_opCtx.get(), "testSetDropPending", _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); ASSERT_TRUE(db); @@ -137,7 +137,7 @@ TEST_F(DatabaseTest, SetDropPendingThrowsExceptionIfDatabaseIsAlreadyInADropPend TEST_F(DatabaseTest, CreateCollectionThrowsExceptionWhenDatabaseIsInADropPendingState) { writeConflictRetry( - _opCtx.get(), "testÇreateCollectionWhenDatabaseIsInADropPendingState", _nss.ns(), [this] { + _opCtx.get(), "testÇreateCollectionWhenDatabaseIsInADropPendingState", _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); ASSERT_TRUE(db); @@ -162,7 +162,7 @@ void _testDropCollection(OperationContext* opCtx, const repl::OpTime& dropOpTime = {}, const CollectionOptions& collOpts = {}) { if (createCollectionBeforeDrop) { - writeConflictRetry(opCtx, "testDropCollection", nss.ns(), [=] { + writeConflictRetry(opCtx, "testDropCollection", nss, [=] { WriteUnitOfWork wuow(opCtx); AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); @@ -172,7 +172,7 @@ void _testDropCollection(OperationContext* opCtx, }); } - writeConflictRetry(opCtx, "testDropCollection", nss.ns(), [=] { + writeConflictRetry(opCtx, "testDropCollection", nss, [=] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); ASSERT_TRUE(db); @@ -216,7 +216,7 @@ TEST_F(DatabaseTest, DropCollectionRejectsProvidedDropOpTimeIfWritesAreReplicate auto nss = _nss; AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); - writeConflictRetry(opCtx, "testDropOpTimeWithReplicated", nss.ns(), [&] { + writeConflictRetry(opCtx, "testDropOpTimeWithReplicated", nss, [&] { ASSERT_TRUE(db); WriteUnitOfWork wuow(opCtx); @@ -231,7 +231,7 @@ TEST_F(DatabaseTest, DropCollectionRejectsProvidedDropOpTimeIfWritesAreReplicate void _testDropCollectionThrowsExceptionIfThereAreIndexesInProgress(OperationContext* opCtx, const NamespaceString& nss) { - writeConflictRetry(opCtx, "testDropCollectionWithIndexesInProgress", nss.ns(), [opCtx, nss] { + writeConflictRetry(opCtx, "testDropCollectionWithIndexesInProgress", nss, [opCtx, nss] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); ASSERT_TRUE(db); @@ -295,7 +295,7 @@ TEST_F(DatabaseTest, RenameCollectionPreservesUuidOfSourceCollectionAndUpdatesUu ASSERT_TRUE(db); auto fromUuid = UUID::gen(); - writeConflictRetry(opCtx, "create", fromNss.ns(), [&] { + writeConflictRetry(opCtx, "create", fromNss, [&] { auto catalog = CollectionCatalog::get(opCtx); ASSERT_EQUALS(boost::none, catalog->lookupNSSByUUID(opCtx, fromUuid)); @@ -307,7 +307,7 @@ TEST_F(DatabaseTest, RenameCollectionPreservesUuidOfSourceCollectionAndUpdatesUu wuow.commit(); }); - writeConflictRetry(opCtx, "rename", fromNss.ns(), [&] { + writeConflictRetry(opCtx, "rename", fromNss, [&] { WriteUnitOfWork wuow(opCtx); auto stayTemp = false; ASSERT_OK(db->renameCollection(opCtx, fromNss, toNss, stayTemp)); @@ -331,7 +331,7 @@ TEST_F(DatabaseTest, RenameCollectionPreservesUuidOfSourceCollectionAndUpdatesUu TEST_F(DatabaseTest, MakeUniqueCollectionNamespaceReturnsFailedToParseIfModelDoesNotContainPercentSign) { - writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss.ns(), [this] { + writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); ASSERT_TRUE(db); @@ -342,7 +342,7 @@ TEST_F(DatabaseTest, } TEST_F(DatabaseTest, MakeUniqueCollectionNamespaceReplacesPercentSignsWithRandomCharacters) { - writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss.ns(), [this] { + writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); ASSERT_TRUE(db); @@ -385,7 +385,7 @@ TEST_F(DatabaseTest, MakeUniqueCollectionNamespaceReplacesPercentSignsWithRandom TEST_F( DatabaseTest, MakeUniqueCollectionNamespaceReturnsNamespaceExistsIfGeneratedNamesMatchExistingCollections) { - writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss.ns(), [this] { + writeConflictRetry(_opCtx.get(), "testMakeUniqueCollectionNamespace", _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); ASSERT_TRUE(db); @@ -467,7 +467,7 @@ TEST_F(DatabaseTest, AutoGetCollectionForReadCommandSucceedsWithDeadlineMin) { TEST_F(DatabaseTest, CreateCollectionProhibitsReplicatedCollectionsWithoutIdIndex) { writeConflictRetry(_opCtx.get(), "testÇreateCollectionProhibitsReplicatedCollectionsWithoutIdIndex", - _nss.ns(), + _nss, [this] { AutoGetDb autoDb(_opCtx.get(), _nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(_opCtx.get()); diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index c4aa11d09a2..b1428e587ae 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -371,7 +371,7 @@ Status _dropCollection(OperationContext* opCtx, boost::optional<UUID> dropIfUUIDNotMatching = boost::none) { try { - return writeConflictRetry(opCtx, "drop", collectionName.ns(), [&] { + return writeConflictRetry(opCtx, "drop", collectionName, [&] { // If a change collection is to be dropped, that is, the change streams are being // disabled for a tenant, acquire exclusive tenant lock. AutoGetDb autoDb(opCtx, @@ -453,7 +453,7 @@ Status _dropCollection(OperationContext* opCtx, // Drop the buckets collection in its own writeConflictRetry so that if // it throws a WCE, only the buckets collection drop is retried. writeConflictRetry( - opCtx, "drop", bucketsNs.ns(), [opCtx, db, &bucketsNs, fromMigrate] { + opCtx, "drop", bucketsNs, [opCtx, db, &bucketsNs, fromMigrate] { WriteUnitOfWork wuow(opCtx); db->dropCollectionEvenIfSystem(opCtx, bucketsNs, {}, fromMigrate) .ignore(); @@ -584,7 +584,7 @@ Status dropCollectionForApplyOps(OperationContext* opCtx, LOGV2(20333, "Hanging drop collection before lock acquisition while fail point is set"); hangDropCollectionBeforeLockAcquisition.pauseWhileSet(); } - return writeConflictRetry(opCtx, "drop", collectionName.ns(), [&] { + return writeConflictRetry(opCtx, "drop", collectionName, [&] { AutoGetDb autoDb(opCtx, collectionName.dbName(), MODE_IX); Database* db = autoDb.getDb(); if (!db) { diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp index 05994b0faa7..93fac4558c5 100644 --- a/src/mongo/db/catalog/drop_database.cpp +++ b/src/mongo/db/catalog/drop_database.cpp @@ -104,7 +104,7 @@ void _finishDropDatabase(OperationContext* opCtx, IndexBuildsCoordinator::get(opCtx)->assertNoBgOpInProgForDb(dbName); } - writeConflictRetry(opCtx, "dropDatabase_database", toStringForLogging(dbName), [&] { + writeConflictRetry(opCtx, "dropDatabase_database", NamespaceString(dbName), [&] { // We need to replicate the dropDatabase oplog entry and clear the collection catalog in the // same transaction. This is to prevent stepdown from interrupting between these two // operations and leaving this node in an inconsistent state. @@ -262,7 +262,7 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a logAttrs(dbName), "namespace"_attr = nss); - writeConflictRetry(opCtx, "dropDatabase_views_collection", nss.ns(), [&] { + writeConflictRetry(opCtx, "dropDatabase_views_collection", nss, [&] { WriteUnitOfWork wunit(opCtx); fassert(7193701, db->dropCollectionEvenIfSystem(opCtx, nss)); wunit.commit(); @@ -321,7 +321,7 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a catalog->lookupCollectionByNamespace(opCtx, nss)->uuid()); } - writeConflictRetry(opCtx, "dropDatabase_collection", nss.ns(), [&] { + writeConflictRetry(opCtx, "dropDatabase_collection", nss, [&] { WriteUnitOfWork wunit(opCtx); // A primary processing this will assign a timestamp when the operation is written // to the oplog. As stated above, a secondary processing must only observe diff --git a/src/mongo/db/catalog/drop_database_test.cpp b/src/mongo/db/catalog/drop_database_test.cpp index 0cf234450bc..5c5bcf75143 100644 --- a/src/mongo/db/catalog/drop_database_test.cpp +++ b/src/mongo/db/catalog/drop_database_test.cpp @@ -182,7 +182,7 @@ void DropDatabaseTest::tearDown() { * Creates a collection without any namespace restrictions. */ void _createCollection(OperationContext* opCtx, const NamespaceString& nss) { - writeConflictRetry(opCtx, "testDropCollection", nss.ns(), [=] { + writeConflictRetry(opCtx, "testDropCollection", nss, [=] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); ASSERT_TRUE(db); diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index 8c5bcd042cb..91250e826ce 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -508,7 +508,7 @@ DropIndexesReply dropIndexes(OperationContext* opCtx, // The index catalog requires that no active index builders are running when dropping ready // indexes. IndexBuildsCoordinator::get(opCtx)->assertNoIndexBuildInProgForCollection(collectionUUID); - writeConflictRetry(opCtx, "dropIndexes", dbAndUUID.toString(), [&] { + writeConflictRetry(opCtx, "dropIndexes", dbAndUUID, [&] { WriteUnitOfWork wuow(opCtx); // This is necessary to check shard version. @@ -561,16 +561,15 @@ DropIndexesReply dropIndexes(OperationContext* opCtx, invariant((*collection)->getIndexCatalog()->numIndexesInProgress() == 0); } - writeConflictRetry( - opCtx, "dropIndexes", dbAndUUID.toString(), [opCtx, &collection, &indexNames, &reply] { - WriteUnitOfWork wunit(opCtx); + writeConflictRetry(opCtx, "dropIndexes", dbAndUUID, [opCtx, &collection, &indexNames, &reply] { + WriteUnitOfWork wunit(opCtx); - // This is necessary to check shard version. - OldClientContext ctx(opCtx, (*collection)->ns()); - dropReadyIndexes( - opCtx, collection->getWritableCollection(opCtx), indexNames, &reply, false); - wunit.commit(); - }); + // This is necessary to check shard version. + OldClientContext ctx(opCtx, (*collection)->ns()); + dropReadyIndexes( + opCtx, collection->getWritableCollection(opCtx), indexNames, &reply, false); + wunit.commit(); + }); return reply; } @@ -584,7 +583,7 @@ Status dropIndexesForApplyOps(OperationContext* opCtx, auto parsed = DropIndexes::parse( IDLParserContext{"dropIndexes", false /* apiStrict */, nss.tenantId()}, cmdObjWithDb); - return writeConflictRetry(opCtx, "dropIndexes", nss.db(), [opCtx, &nss, &cmdObj, &parsed] { + return writeConflictRetry(opCtx, "dropIndexes", nss, [opCtx, &nss, &cmdObj, &parsed] { AutoGetCollection collection(opCtx, nss, MODE_X); // If db/collection does not exist, short circuit and return. diff --git a/src/mongo/db/catalog/index_builds_manager.cpp b/src/mongo/db/catalog/index_builds_manager.cpp index 06f95b1b368..95394e8f72c 100644 --- a/src/mongo/db/catalog/index_builds_manager.cpp +++ b/src/mongo/db/catalog/index_builds_manager.cpp @@ -115,7 +115,7 @@ Status IndexBuildsManager::setUpIndexBuild(OperationContext* opCtx, std::vector<BSONObj> indexes; try { - indexes = writeConflictRetry(opCtx, "IndexBuildsManager::setUpIndexBuild", nss.ns(), [&]() { + indexes = writeConflictRetry(opCtx, "IndexBuildsManager::setUpIndexBuild", nss, [&]() { MultiIndexBlock::InitMode mode = options.forRecovery ? MultiIndexBlock::InitMode::Recovery : MultiIndexBlock::InitMode::SteadyState; @@ -172,7 +172,7 @@ StatusWith<std::pair<long long, long long>> IndexBuildsManager::startBuildingInd opCtx->checkForInterrupt(); // Cursor is left one past the end of the batch inside writeConflictRetry auto beginBatchId = record->id; - Status status = writeConflictRetry(opCtx, "repairDatabase", ns.ns(), [&] { + Status status = writeConflictRetry(opCtx, "repairDatabase", ns, [&] { // In the case of WCE in a partial batch, we need to go back to the beginning if (!record || (beginBatchId != record->id)) { record = cursor->seekExact(beginBatchId); @@ -216,7 +216,7 @@ StatusWith<std::pair<long long, long long>> IndexBuildsManager::startBuildingInd writeConflictRetry( opCtx, "insertSingleDocumentForInitialSyncOrRecovery-restoreCursor", - ns.ns(), + ns, [&cursor] { cursor->restore(); }); }); if (!insertStatus.isOK()) { @@ -239,7 +239,7 @@ StatusWith<std::pair<long long, long long>> IndexBuildsManager::startBuildingInd ON_BLOCK_EXIT([opCtx, ns, &cursor]() { // restore CAN throw WCE per API writeConflictRetry( - opCtx, "retryRestoreCursor", ns.ns(), [&cursor] { cursor->restore(); }); + opCtx, "retryRestoreCursor", ns, [&cursor] { cursor->restore(); }); }); wunit.commit(); return Status::OK(); @@ -336,7 +336,7 @@ Status IndexBuildsManager::commitIndexBuild(OperationContext* opCtx, return writeConflictRetry( opCtx, "IndexBuildsManager::commitIndexBuild", - nss.ns(), + nss, [this, builder, buildUUID, opCtx, &collection, nss, &onCreateEachFn, &onCommitFn] { WriteUnitOfWork wunit(opCtx); auto status = builder->commit( diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.cpp b/src/mongo/db/catalog/index_catalog_entry_impl.cpp index 5af5e3104f5..bc8c46d8d54 100644 --- a/src/mongo/db/catalog/index_catalog_entry_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_entry_impl.cpp @@ -295,7 +295,7 @@ Status IndexCatalogEntryImpl::_setMultikeyInMultiDocumentTransaction( } writeConflictRetry( - opCtx, "set index multikey", collection->ns().ns(), [&] { + opCtx, "set index multikey", collection->ns(), [&] { WriteUnitOfWork wuow(opCtx); // If we have a prepare optime for recovery, then we always use that. This is safe since diff --git a/src/mongo/db/catalog/index_consistency.cpp b/src/mongo/db/catalog/index_consistency.cpp index fca2e520a42..e46949d4eff 100644 --- a/src/mongo/db/catalog/index_consistency.cpp +++ b/src/mongo/db/catalog/index_consistency.cpp @@ -497,13 +497,12 @@ void KeyStringIndexConsistency::addIndexKey(OperationContext* opCtx, InsertDeleteOptions options; options.dupsAllowed = !indexInfo->unique; int64_t numDeleted = 0; - writeConflictRetry( - opCtx, "removingExtraIndexEntries", _validateState->nss().ns(), [&] { - WriteUnitOfWork wunit(opCtx); - Status status = indexInfo->accessMethod->asSortedData()->removeKeys( - opCtx, {ks}, options, &numDeleted); - wunit.commit(); - }); + writeConflictRetry(opCtx, "removingExtraIndexEntries", _validateState->nss(), [&] { + WriteUnitOfWork wunit(opCtx); + Status status = indexInfo->accessMethod->asSortedData()->removeKeys( + opCtx, {ks}, options, &numDeleted); + wunit.commit(); + }); auto& indexResults = results->indexResultsMap[indexInfo->indexName]; indexResults.keysTraversed -= numDeleted; results->numRemovedExtraIndexEntries += numDeleted; @@ -871,7 +870,7 @@ int64_t KeyStringIndexConsistency::traverseIndex(OperationContext* opCtx, // 2. This index was built before 3.4, and there is no multikey path information for // the index. We can effectively 'upgrade' the index so that it does not need to be // rebuilt to update this information. - writeConflictRetry(opCtx, "updateMultikeyPaths", _validateState->nss().ns(), [&]() { + writeConflictRetry(opCtx, "updateMultikeyPaths", _validateState->nss(), [&]() { WriteUnitOfWork wuow(opCtx); auto writeableIndex = const_cast<IndexCatalogEntry*>(index); const bool isMultikey = true; @@ -898,7 +897,7 @@ int64_t KeyStringIndexConsistency::traverseIndex(OperationContext* opCtx, // This makes an improvement in the case that no documents make the index multikey and // the flag can be unset entirely. This may be due to a change in the data or historical // multikey bugs that have persisted incorrect multikey infomation. - writeConflictRetry(opCtx, "unsetMultikeyPaths", _validateState->nss().ns(), [&]() { + writeConflictRetry(opCtx, "unsetMultikeyPaths", _validateState->nss(), [&]() { WriteUnitOfWork wuow(opCtx); auto writeableIndex = const_cast<IndexCatalogEntry*>(index); const bool isMultikey = false; @@ -972,7 +971,7 @@ void KeyStringIndexConsistency::traverseRecord(OperationContext* opCtx, if (!index->isMultikey(opCtx, coll) && shouldBeMultikey) { if (_validateState->fixErrors()) { - writeConflictRetry(opCtx, "setIndexAsMultikey", coll->ns().ns(), [&] { + writeConflictRetry(opCtx, "setIndexAsMultikey", coll->ns(), [&] { WriteUnitOfWork wuow(opCtx); coll->getIndexCatalog()->setMultikeyPaths( opCtx, coll, descriptor, *multikeyMetadataKeys, *documentMultikeyPaths); @@ -1009,7 +1008,7 @@ void KeyStringIndexConsistency::traverseRecord(OperationContext* opCtx, const MultikeyPaths& indexPaths = index->getMultikeyPaths(opCtx, coll); if (!MultikeyPathTracker::covers(indexPaths, *documentMultikeyPaths.get())) { if (_validateState->fixErrors()) { - writeConflictRetry(opCtx, "increaseMultikeyPathCoverage", coll->ns().ns(), [&] { + writeConflictRetry(opCtx, "increaseMultikeyPathCoverage", coll->ns(), [&] { WriteUnitOfWork wuow(opCtx); coll->getIndexCatalog()->setMultikeyPaths( opCtx, coll, descriptor, *multikeyMetadataKeys, *documentMultikeyPaths); diff --git a/src/mongo/db/catalog/index_repair.cpp b/src/mongo/db/catalog/index_repair.cpp index 8c22180f3ab..4b3744a9f23 100644 --- a/src/mongo/db/catalog/index_repair.cpp +++ b/src/mongo/db/catalog/index_repair.cpp @@ -52,7 +52,7 @@ StatusWith<int> moveRecordToLostAndFound(OperationContext* opCtx, // Creates the collection if it doesn't exist. if (!localCollection) { Status status = - writeConflictRetry(opCtx, "createLostAndFoundCollection", lostAndFoundNss.ns(), [&]() { + writeConflictRetry(opCtx, "createLostAndFoundCollection", lostAndFoundNss, [&]() { // Ensure the database exists. auto db = autoColl.ensureDbExists(opCtx); invariant(db, lostAndFoundNss.toStringForErrorMsg()); @@ -81,7 +81,7 @@ StatusWith<int> moveRecordToLostAndFound(OperationContext* opCtx, localCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, localCollection)); return writeConflictRetry( - opCtx, "writeDupDocToLostAndFoundCollection", nss.ns(), [&]() -> StatusWith<int> { + opCtx, "writeDupDocToLostAndFoundCollection", nss, [&]() -> StatusWith<int> { WriteUnitOfWork wuow(opCtx); Snapshotted<BSONObj> doc; int docSize = 0; @@ -130,7 +130,7 @@ int repairMissingIndexEntry(OperationContext* opCtx, int64_t numInserted = 0; Status insertStatus = Status::OK(); - writeConflictRetry(opCtx, "insertingMissingIndexEntries", nss.ns(), [&] { + writeConflictRetry(opCtx, "insertingMissingIndexEntries", nss, [&] { WriteUnitOfWork wunit(opCtx); insertStatus = accessMethod->insertKeysAndUpdateMultikeyPaths(opCtx, @@ -195,7 +195,7 @@ int repairMissingIndexEntry(OperationContext* opCtx, // duplicate records is in the index, so we need to add the newer record to the // index. if (dupKeyRid && ridToMove == *dupKeyRid) { - writeConflictRetry(opCtx, "insertingMissingIndexEntries", nss.ns(), [&] { + writeConflictRetry(opCtx, "insertingMissingIndexEntries", nss, [&] { WriteUnitOfWork wunit(opCtx); insertStatus = accessMethod->insertKeysAndUpdateMultikeyPaths( opCtx, coll, {ks}, {}, {}, options, nullptr, nullptr); diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index ec78615b408..5d72129db49 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -869,7 +869,7 @@ Status MultiIndexBlock::dumpInsertsFromBulk( // Do not record duplicates when explicitly ignored. This may be the case on // secondaries. return writeConflictRetry( - opCtx, "recordingDuplicateKey", entry->getNSSFromCatalog(opCtx).ns(), [&] { + opCtx, "recordingDuplicateKey", entry->getNSSFromCatalog(opCtx), [&] { if (dupsAllowed && !onDuplicateRecord && !_ignoreUnique && entry->indexBuildInterceptor()) { WriteUnitOfWork wuow(opCtx); diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 7ebfab1a0bc..c0f0f633423 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -173,7 +173,7 @@ Status renameTargetCollectionToTmp(OperationContext* opCtx, } const auto& tmpName = tmpNameResult.getValue(); const bool stayTemp = true; - return writeConflictRetry(opCtx, "renameCollection", targetNs.ns(), [&] { + return writeConflictRetry(opCtx, "renameCollection", targetNs, [&] { WriteUnitOfWork wunit(opCtx); auto status = targetDB->renameCollection(opCtx, targetNs, tmpName, stayTemp); if (!status.isOK()) @@ -202,7 +202,7 @@ Status renameCollectionDirectly(OperationContext* opCtx, NamespaceString source, NamespaceString target, RenameCollectionOptions options) { - return writeConflictRetry(opCtx, "renameCollection", target.ns(), [&] { + return writeConflictRetry(opCtx, "renameCollection", target, [&] { WriteUnitOfWork wunit(opCtx); { @@ -233,7 +233,7 @@ Status renameCollectionAndDropTarget(OperationContext* opCtx, const CollectionPtr& targetColl, RenameCollectionOptions options, repl::OpTime renameOpTimeFromApplyOps) { - return writeConflictRetry(opCtx, "renameCollection", target.ns(), [&] { + return writeConflictRetry(opCtx, "renameCollection", target, [&] { WriteUnitOfWork wunit(opCtx); // Target collection exists - drop it. @@ -375,7 +375,7 @@ Status renameCollectionWithinDBForApplyOps(OperationContext* opCtx, AutoStatsTracker::LogMode::kUpdateCurOp, CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(source.dbName())); - return writeConflictRetry(opCtx, "renameCollection", target.ns(), [&] { + return writeConflictRetry(opCtx, "renameCollection", target, [&] { auto targetColl = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, target); WriteUnitOfWork wuow(opCtx); if (targetColl) { @@ -576,7 +576,7 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx, auto collectionOptions = sourceColl->getCollectionOptions(); collectionOptions.uuid = tmpCollUUID.uuid(); - writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { + writeConflictRetry(opCtx, "renameCollection", tmpName, [&] { WriteUnitOfWork wunit(opCtx); targetDB->createCollection(opCtx, tmpName, collectionOptions); wunit.commit(); @@ -633,7 +633,7 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx, // index in an unfinished state. For more information on assigning timestamps to multiple index // builds, please see SERVER-35780 and SERVER-35070. if (!indexesToCopy.empty()) { - Status status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { + Status status = writeConflictRetry(opCtx, "renameCollection", tmpName, [&] { WriteUnitOfWork wunit(opCtx); auto fromMigrate = false; try { @@ -681,7 +681,7 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx, opCtx->checkForInterrupt(); // Cursor is left one past the end of the batch inside writeConflictRetry. auto beginBatchId = record->id; - Status status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] { + Status status = writeConflictRetry(opCtx, "renameCollection", tmpName, [&] { // Always reposition cursor in case it gets a WCE midway through. record = cursor->seekExact(beginBatchId); @@ -733,7 +733,7 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx, cursor->save(); // When this exits via success or WCE, we need to restore the cursor. - ON_BLOCK_EXIT([opCtx, ns = tmpName.ns(), &cursor]() { + ON_BLOCK_EXIT([opCtx, ns = tmpName, &cursor]() { writeConflictRetry( opCtx, "retryRestoreCursor", ns, [&cursor] { cursor->restore(); }); }); diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp index 3d56994f277..2421616a73a 100644 --- a/src/mongo/db/catalog/rename_collection_test.cpp +++ b/src/mongo/db/catalog/rename_collection_test.cpp @@ -405,7 +405,7 @@ void RenameCollectionTest::tearDown() { void _createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions options = {}) { - writeConflictRetry(opCtx, "_createCollection", nss.ns(), [=] { + writeConflictRetry(opCtx, "_createCollection", nss, [=] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_X); auto db = autoDb.ensureDbExists(opCtx); ASSERT_TRUE(db) << "Cannot create collection " << nss.toStringForErrorMsg() @@ -490,7 +490,7 @@ bool _isTempCollection(OperationContext* opCtx, const NamespaceString& nss) { void _createIndexOnEmptyCollection(OperationContext* opCtx, const NamespaceString& nss, const std::string& indexName) { - writeConflictRetry(opCtx, "_createIndexOnEmptyCollection", nss.ns(), [=] { + writeConflictRetry(opCtx, "_createIndexOnEmptyCollection", nss, [=] { AutoGetCollection collection(opCtx, nss, MODE_X); ASSERT_TRUE(collection) << "Cannot create index on empty collection " << nss.toStringForErrorMsg() << " because collection " @@ -515,7 +515,7 @@ void _createIndexOnEmptyCollection(OperationContext* opCtx, * Inserts a single document into a collection. */ void _insertDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { - writeConflictRetry(opCtx, "_insertDocument", nss.ns(), [=] { + writeConflictRetry(opCtx, "_insertDocument", nss, [=] { AutoGetCollection collection(opCtx, nss, MODE_X); ASSERT_TRUE(collection) << "Cannot insert document " << doc << " into collection " << nss.toStringForErrorMsg() << " because collection " diff --git a/src/mongo/db/catalog/validate_adaptor.cpp b/src/mongo/db/catalog/validate_adaptor.cpp index 92a8937bdf6..818ed98e73b 100644 --- a/src/mongo/db/catalog/validate_adaptor.cpp +++ b/src/mongo/db/catalog/validate_adaptor.cpp @@ -668,12 +668,11 @@ void ValidateAdaptor::traverseRecordStore(OperationContext* opCtx, } if (_validateState->fixErrors()) { - writeConflictRetry( - opCtx, "corrupt record removal", _validateState->nss().ns(), [&] { - WriteUnitOfWork wunit(opCtx); - rs->deleteRecord(opCtx, record->id); - wunit.commit(); - }); + writeConflictRetry(opCtx, "corrupt record removal", _validateState->nss(), [&] { + WriteUnitOfWork wunit(opCtx); + rs->deleteRecord(opCtx, record->id); + wunit.commit(); + }); results->repaired = true; results->numRemovedCorruptRecords++; _numRecords--; diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index 93c1007cfb3..23c780ed5f1 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -671,13 +671,11 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume // No marker means it's a new collection, or we've just performed startup. Initialize // the TruncateMarkers instance. if (!truncateMarkers) { - writeConflictRetry(opCtx, - "initialise change collection truncate markers", - changeCollectionPtr->ns().ns(), - [&] { - truncateMarkers = initialiseTruncateMarkers( - opCtx, changeCollectionPtr.get(), truncateMap); - }); + writeConflictRetry( + opCtx, "initialise change collection truncate markers", changeCollectionPtr->ns(), [&] { + truncateMarkers = + initialiseTruncateMarkers(opCtx, changeCollectionPtr.get(), truncateMap); + }); } int64_t numRecordsDeleted = 0; @@ -685,41 +683,40 @@ size_t ChangeStreamChangeCollectionManager::removeExpiredChangeCollectionsDocume auto removeExpiredMarkers = [&] { auto rs = changeCollectionPtr->getRecordStore(); while (auto marker = truncateMarkers->peekOldestMarkerIfNeeded(opCtx)) { - writeConflictRetry( - opCtx, "truncate change collection", changeCollectionPtr->ns().ns(), [&] { - // The session might be in use from marker initialisation so we must reset it - // here in order to allow an untimestamped write. - opCtx->recoveryUnit()->abandonSnapshot(); - opCtx->recoveryUnit()->allowOneUntimestampedWrite(); - WriteUnitOfWork wuow(opCtx); - - auto bytesDeleted = marker->bytes; - auto docsDeleted = marker->records; - - auto status = rs->rangeTruncate( - opCtx, - // Truncate from the beginning of the collection, this will - // cover cases where some leftover documents are present. - RecordId(), - marker->lastRecord, - -bytesDeleted, - -docsDeleted); - invariantStatusOK(status); - - wuow.commit(); - - truncateMarkers->popOldestMarker(); - numRecordsDeleted += docsDeleted; - - auto& purgingJobStats = changeCollectionManager.getPurgingJobStats(); - purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted); - purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted); - - auto millisWallTime = marker->wallTime.toMillisSinceEpoch(); - if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) { - purgingJobStats.maxStartWallTimeMillis.store(millisWallTime); - } - }); + writeConflictRetry(opCtx, "truncate change collection", changeCollectionPtr->ns(), [&] { + // The session might be in use from marker initialisation so we must reset it + // here in order to allow an untimestamped write. + opCtx->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->allowOneUntimestampedWrite(); + WriteUnitOfWork wuow(opCtx); + + auto bytesDeleted = marker->bytes; + auto docsDeleted = marker->records; + + auto status = + rs->rangeTruncate(opCtx, + // Truncate from the beginning of the collection, this will + // cover cases where some leftover documents are present. + RecordId(), + marker->lastRecord, + -bytesDeleted, + -docsDeleted); + invariantStatusOK(status); + + wuow.commit(); + + truncateMarkers->popOldestMarker(); + numRecordsDeleted += docsDeleted; + + auto& purgingJobStats = changeCollectionManager.getPurgingJobStats(); + purgingJobStats.docsDeleted.fetchAndAddRelaxed(docsDeleted); + purgingJobStats.bytesDeleted.fetchAndAddRelaxed(bytesDeleted); + + auto millisWallTime = marker->wallTime.toMillisSinceEpoch(); + if (purgingJobStats.maxStartWallTimeMillis.load() < millisWallTime) { + purgingJobStats.maxStartWallTimeMillis.store(millisWallTime); + } + }); } }; diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index 503c44785b0..772c46dfc27 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -114,7 +114,7 @@ void truncateExpiredMarkersForCollection( Date_t& maxWallTimeForNsTruncateOutput) { while (auto marker = truncateMarkersForNss->peekOldestMarkerIfNeeded(opCtx)) { writeConflictRetry( - opCtx, "truncate pre-images collection for UUID", preImagesColl->ns().ns(), [&] { + opCtx, "truncate pre-images collection for UUID", preImagesColl->ns(), [&] { // The session might be in use from marker initialisation so we must // reset it here in order to allow an untimestamped write. opCtx->recoveryUnit()->abandonSnapshot(); @@ -464,7 +464,7 @@ size_t ChangeStreamPreImagesCollectionManager::_deleteExpiredPreImagesWithCollSc writeConflictRetry( opCtx, "ChangeStreamExpiredPreImagesRemover", - NamespaceString::makePreImageCollectionNSS(boost::none).ns(), + NamespaceString::makePreImageCollectionNSS(boost::none), [&] { auto exec = getDeleteExpiredPreImagesExecutor( opCtx, preImageColl, filterPtr, maxRecordIdTimestamp, *currentCollectionUUID); diff --git a/src/mongo/db/cloner.cpp b/src/mongo/db/cloner.cpp index 4b89594e915..60674fc6f60 100644 --- a/src/mongo/db/cloner.cpp +++ b/src/mongo/db/cloner.cpp @@ -108,7 +108,7 @@ struct DefaultClonerImpl::BatchHandler { auto catalog = CollectionCatalog::get(opCtx); auto collection = catalog->lookupCollectionByNamespace(opCtx, nss); if (!collection) { - writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nss, [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); @@ -190,7 +190,7 @@ struct DefaultClonerImpl::BatchHandler { verify(collection); ++numSeen; - writeConflictRetry(opCtx, "cloner insert", nss.ns(), [&] { + writeConflictRetry(opCtx, "cloner insert", nss, [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); @@ -302,7 +302,7 @@ void DefaultClonerImpl::_copyIndexes(OperationContext* opCtx, } auto fromMigrate = false; - writeConflictRetry(opCtx, "_copyIndexes", nss.ns(), [&] { + writeConflictRetry(opCtx, "_copyIndexes", nss, [&] { WriteUnitOfWork wunit(opCtx); IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( opCtx, collection, indexesToBuild, fromMigrate); @@ -367,7 +367,7 @@ Status DefaultClonerImpl::_createCollectionsForDb( const NamespaceString nss(dbName, params.collectionName); uassertStatusOK(userAllowedCreateNS(opCtx, nss)); - Status status = writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { + Status status = writeConflictRetry(opCtx, "createCollection", nss, [&] { opCtx->checkForInterrupt(); WriteUnitOfWork wunit(opCtx); diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index c45f4b8d68e..c2d989ef9a7 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -631,7 +631,7 @@ bool handleUpdateOp(OperationContext* opCtx, // is executing an update. This is done to ensure that we can always match, // modify, and return the document under concurrency, if a matching document exists. lastOpFixer.startingOp(nsString); - return writeConflictRetry(opCtx, "bulkWriteUpdate", nsString.ns(), [&] { + return writeConflictRetry(opCtx, "bulkWriteUpdate", nsString, [&] { if (MONGO_unlikely(hangBeforeBulkWritePerformsUpdate.shouldFail())) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangBeforeBulkWritePerformsUpdate, opCtx, "hangBeforeBulkWritePerformsUpdate"); @@ -769,7 +769,7 @@ bool handleDeleteOp(OperationContext* opCtx, const bool inTransaction = opCtx->inMultiDocumentTransaction(); lastOpFixer.startingOp(nsString); - return writeConflictRetry(opCtx, "bulkWriteDelete", nsString.ns(), [&] { + return writeConflictRetry(opCtx, "bulkWriteDelete", nsString, [&] { boost::optional<BSONObj> docFound; auto nDeleted = write_ops_exec::writeConflictRetryRemove( opCtx, nsString, &deleteRequest, curOp, opDebug, inTransaction, docFound); diff --git a/src/mongo/db/commands/create_indexes_cmd.cpp b/src/mongo/db/commands/create_indexes_cmd.cpp index ce94c92662a..5c055945f8b 100644 --- a/src/mongo/db/commands/create_indexes_cmd.cpp +++ b/src/mongo/db/commands/create_indexes_cmd.cpp @@ -497,7 +497,7 @@ CreateIndexesReply runCreateIndexesWithCoordinator(OperationContext* opCtx, << "Not primary while creating indexes in " << ns.toStringForErrorMsg()); } - bool indexExists = writeConflictRetry(opCtx, "createCollectionWithIndexes", ns.ns(), [&] { + bool indexExists = writeConflictRetry(opCtx, "createCollectionWithIndexes", ns, [&] { AutoGetCollection collection( opCtx, ns, diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp index 3fce4b46a11..62fcf2dd73d 100644 --- a/src/mongo/db/commands/dbcheck.cpp +++ b/src/mongo/db/commands/dbcheck.cpp @@ -71,7 +71,7 @@ repl::OpTime _logOp(OperationContext* opCtx, oplogEntry.setObject(obj); AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); return writeConflictRetry( - opCtx, "dbCheck oplog entry", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "dbCheck oplog entry", NamespaceString::kRsOplogNamespace, [&] { auto const clockSource = opCtx->getServiceContext()->getFastClockSource(); oplogEntry.setWallClockTime(clockSource->now()); diff --git a/src/mongo/db/commands/dbcommands_d.cpp b/src/mongo/db/commands/dbcommands_d.cpp index 16f3f9c457d..a77ed7e7260 100644 --- a/src/mongo/db/commands/dbcommands_d.cpp +++ b/src/mongo/db/commands/dbcommands_d.cpp @@ -292,7 +292,7 @@ public: BSONObj query = BSON("files_id" << jsobj["filemd5"] << "n" << GTE << n); BSONObj sort = BSON("files_id" << 1 << "n" << 1); - return writeConflictRetry(opCtx, "filemd5", toStringForLogging(dbName), [&] { + return writeConflictRetry(opCtx, "filemd5", NamespaceString(dbName), [&] { auto findCommand = std::make_unique<FindCommandRequest>(nss); findCommand->setFilter(query.getOwned()); findCommand->setSort(sort.getOwned()); diff --git a/src/mongo/db/commands/drop_indexes_cmd.cpp b/src/mongo/db/commands/drop_indexes_cmd.cpp index 4cdb380c119..57611e1fb33 100644 --- a/src/mongo/db/commands/drop_indexes_cmd.cpp +++ b/src/mongo/db/commands/drop_indexes_cmd.cpp @@ -192,7 +192,7 @@ public: std::vector<BSONObj> all; { std::vector<std::string> indexNames; - writeConflictRetry(opCtx, "listIndexes", toReIndexNss.ns(), [&] { + writeConflictRetry(opCtx, "listIndexes", toReIndexNss, [&] { indexNames.clear(); acquisition.getCollectionPtr()->getAllIndexes(&indexNames); }); @@ -201,7 +201,7 @@ public: for (size_t i = 0; i < indexNames.size(); i++) { const std::string& name = indexNames[i]; - BSONObj spec = writeConflictRetry(opCtx, "getIndexSpec", toReIndexNss.ns(), [&] { + BSONObj spec = writeConflictRetry(opCtx, "getIndexSpec", toReIndexNss, [&] { return acquisition.getCollectionPtr()->getIndexSpec(name); }); @@ -241,7 +241,7 @@ public: indexer->setIndexBuildMethod(IndexBuildMethod::kForeground); StatusWith<std::vector<BSONObj>> swIndexesToRebuild(ErrorCodes::UnknownError, "Uninitialized"); - writeConflictRetry(opCtx, "dropAllIndexes", toReIndexNss.ns(), [&] { + writeConflictRetry(opCtx, "dropAllIndexes", toReIndexNss, [&] { WriteUnitOfWork wunit(opCtx); CollectionWriter collection(opCtx, &acquisition); collection.getWritableCollection(opCtx)->getIndexCatalog()->dropAllIndexes( @@ -274,7 +274,7 @@ public: uassertStatusOK(indexer->checkConstraints(opCtx, acquisition.getCollectionPtr())); - writeConflictRetry(opCtx, "commitReIndex", toReIndexNss.ns(), [&] { + writeConflictRetry(opCtx, "commitReIndex", toReIndexNss, [&] { WriteUnitOfWork wunit(opCtx); CollectionWriter collection(opCtx, &acquisition); uassertStatusOK(indexer->commit(opCtx, diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 91bd5d63962..9edaaab77bf 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -495,7 +495,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( // Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it // is executing a findAndModify. This is done to ensure that we can always match, // modify, and return the document under concurrency, if a matching document exists. - return writeConflictRetry(opCtx, "findAndModify", nsString.ns(), [&] { + return writeConflictRetry(opCtx, "findAndModify", nsString, [&] { if (req.getRemove().value_or(false)) { DeleteRequest deleteRequest; makeDeleteRequest(opCtx, req, false, &deleteRequest); diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp index 581501ea841..6da5e2cf9a0 100644 --- a/src/mongo/db/commands/fsync.cpp +++ b/src/mongo/db/commands/fsync.cpp @@ -28,6 +28,7 @@ */ +#include "mongo/db/namespace_string.h" #include "mongo/platform/basic.h" #include "mongo/db/commands/fsync.h" @@ -388,9 +389,11 @@ void FSyncLockThread::run() { bool successfulFsyncLock = false; auto backupCursorHooks = BackupCursorHooks::get(_serviceContext); try { + // TODO SERVER-65920: Create a NamespaceString for logging with the "global" ns in + // writeConflictRetry. writeConflictRetry(&opCtx, "beginBackup", - "global", + NamespaceString("global"), [&opCtx, backupCursorHooks, &successfulFsyncLock, storageEngine] { if (backupCursorHooks->enabled()) { backupCursorHooks->fsyncLock(&opCtx); diff --git a/src/mongo/db/commands/list_databases_common.h b/src/mongo/db/commands/list_databases_common.h index 26a3ff7ea1d..d2997966d8e 100644 --- a/src/mongo/db/commands/list_databases_common.h +++ b/src/mongo/db/commands/list_databases_common.h @@ -118,7 +118,7 @@ int64_t setReplyItems(OperationContext* opCtx, continue; } - writeConflictRetry(opCtx, "sizeOnDisk", toStringForLogging(dbName), [&] { + writeConflictRetry(opCtx, "sizeOnDisk", NamespaceString(dbName), [&] { size = storageEngine->sizeOnDiskForDb(opCtx, dbName); }); item.setSizeOnDisk(size); diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp index 5a97f63a3bf..855f2c3bbfb 100644 --- a/src/mongo/db/commands/mr_test.cpp +++ b/src/mongo/db/commands/mr_test.cpp @@ -548,7 +548,7 @@ TEST_F(MapReduceCommandTest, ReplacingExistingOutputCollectionPreservesIndexes) AutoGetCollection coll(_opCtx.get(), outputNss, MODE_X); ASSERT(coll); writeConflictRetry( - _opCtx.get(), "ReplacingExistingOutputCollectionPreservesIndexes", outputNss.ns(), [&] { + _opCtx.get(), "ReplacingExistingOutputCollectionPreservesIndexes", outputNss, [&] { WriteUnitOfWork wuow(_opCtx.get()); ASSERT_OK( coll.getWritableCollection(_opCtx.get()) diff --git a/src/mongo/db/commands/oplog_note.cpp b/src/mongo/db/commands/oplog_note.cpp index 9ed7f7f3060..ccc9588997b 100644 --- a/src/mongo/db/commands/oplog_note.cpp +++ b/src/mongo/db/commands/oplog_note.cpp @@ -77,7 +77,7 @@ Status _performNoopWrite(OperationContext* opCtx, BSONObj msgObj, StringData not return {ErrorCodes::NotWritablePrimary, "Not a primary"}; } - writeConflictRetry(opCtx, note, NamespaceString::kRsOplogNamespace.ns(), [&opCtx, &msgObj] { + writeConflictRetry(opCtx, note, NamespaceString::kRsOplogNamespace, [&opCtx, &msgObj] { WriteUnitOfWork uow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj); uow.commit(); diff --git a/src/mongo/db/commands/resize_oplog.cpp b/src/mongo/db/commands/resize_oplog.cpp index 0c96a36e5d4..8daebc4ecd6 100644 --- a/src/mongo/db/commands/resize_oplog.cpp +++ b/src/mongo/db/commands/resize_oplog.cpp @@ -89,7 +89,7 @@ public: auto params = ReplSetResizeOplogRequest::parse(IDLParserContext("replSetResizeOplog"), jsobj); - return writeConflictRetry(opCtx, "replSetResizeOplog", coll->ns().ns(), [&] { + return writeConflictRetry(opCtx, "replSetResizeOplog", coll->ns(), [&] { WriteUnitOfWork wunit(opCtx); if (auto sizeMB = params.getSize()) { diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index fd3c6e5de70..db14817b030 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -183,7 +183,7 @@ public: TEST_F(DConcurrencyTestFixture, WriteConflictRetryInstantiatesOK) { auto opCtx = makeOperationContext(); getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext())); - writeConflictRetry(opCtx.get(), "", "", [] {}); + writeConflictRetry(opCtx.get(), "", NamespaceString(), [] {}); } TEST_F(DConcurrencyTestFixture, WriteConflictRetryRetriesFunctionOnWriteConflictException) { @@ -191,7 +191,7 @@ TEST_F(DConcurrencyTestFixture, WriteConflictRetryRetriesFunctionOnWriteConflict getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext())); auto&& opDebug = CurOp::get(opCtx.get())->debug(); ASSERT_EQUALS(0, opDebug.additiveMetrics.writeConflicts.load()); - ASSERT_EQUALS(100, writeConflictRetry(opCtx.get(), "", "", [&opDebug] { + ASSERT_EQUALS(100, writeConflictRetry(opCtx.get(), "", NamespaceString(), [&opDebug] { if (0 == opDebug.additiveMetrics.writeConflicts.load()) { throwWriteConflictException( str::stream() @@ -208,7 +208,7 @@ TEST_F(DConcurrencyTestFixture, WriteConflictRetryPropagatesNonWriteConflictExce getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext())); ASSERT_THROWS_CODE(writeConflictRetry(opCtx.get(), "", - "", + NamespaceString(), [] { uassert(ErrorCodes::OperationFailed, "", false); MONGO_UNREACHABLE; @@ -226,7 +226,7 @@ TEST_F(DConcurrencyTestFixture, ASSERT_THROWS(writeConflictRetry( opCtx.get(), "", - "", + NamespaceString(), [] { throwWriteConflictException( str::stream() << "Verify that WriteConflictExceptions are propogated " diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp index 558f3e282cc..a74a2723fdb 100644 --- a/src/mongo/db/concurrency/deferred_writer.cpp +++ b/src/mongo/db/concurrency/deferred_writer.cpp @@ -111,7 +111,7 @@ Status DeferredWriter::_worker(InsertStatement stmt) noexcept try { const CollectionPtr& collection = agc->getCollection(); - Status status = writeConflictRetry(opCtx, "deferred insert", _nss.ns(), [&] { + Status status = writeConflictRetry(opCtx, "deferred insert", _nss, [&] { WriteUnitOfWork wuow(opCtx); Status status = collection_internal::insertDocument(opCtx, collection, stmt, nullptr, false); diff --git a/src/mongo/db/concurrency/exception_util.h b/src/mongo/db/concurrency/exception_util.h index dca5b4f109e..103015d2d25 100644 --- a/src/mongo/db/concurrency/exception_util.h +++ b/src/mongo/db/concurrency/exception_util.h @@ -126,7 +126,10 @@ template <ErrorCodes::Error ec> * invocation of the argument function f without any exception handling and retry logic. */ template <typename F> -auto writeConflictRetry(OperationContext* opCtx, StringData opStr, StringData ns, F&& f) { +auto writeConflictRetry(OperationContext* opCtx, + StringData opStr, + const NamespaceStringOrUUID& nssOrUUID, + F&& f) { invariant(opCtx); invariant(opCtx->lockState()); invariant(opCtx->recoveryUnit()); @@ -141,7 +144,9 @@ auto writeConflictRetry(OperationContext* opCtx, StringData opStr, StringData ns return f(); } catch (TemporarilyUnavailableException const& e) { if (opCtx->inMultiDocumentTransaction()) { - handleTemporarilyUnavailableExceptionInTransaction(opCtx, opStr, ns, e); + // TODO SERVER-76897: use nssOrUUID.toStringForLogging(). + handleTemporarilyUnavailableExceptionInTransaction( + opCtx, opStr, nssOrUUID.toStringForErrorMsg(), e); } throw; } @@ -154,13 +159,16 @@ auto writeConflictRetry(OperationContext* opCtx, StringData opStr, StringData ns return f(); } catch (WriteConflictException const& e) { CurOp::get(opCtx)->debug().additiveMetrics.incrementWriteConflicts(1); - logWriteConflictAndBackoff(writeConflictAttempts, opStr, e.reason(), ns); + logWriteConflictAndBackoff( + writeConflictAttempts, opStr, e.reason(), nssOrUUID.toStringForErrorMsg()); ++writeConflictAttempts; opCtx->recoveryUnit()->abandonSnapshot(); } catch (TemporarilyUnavailableException const& e) { - handleTemporarilyUnavailableException(opCtx, ++attemptsTempUnavailable, opStr, ns, e); + handleTemporarilyUnavailableException( + opCtx, ++attemptsTempUnavailable, opStr, nssOrUUID.toStringForErrorMsg(), e); } catch (TransactionTooLargeForCacheException const& e) { - handleTransactionTooLargeForCacheException(opCtx, &writeConflictAttempts, opStr, ns, e); + handleTransactionTooLargeForCacheException( + opCtx, &writeConflictAttempts, opStr, nssOrUUID.toStringForErrorMsg(), e); } } } diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index ce9f0bad19c..db33a1a543a 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -170,7 +170,7 @@ void UpsertStage::_performInsert(BSONObj newDocument) { &hangBeforeUpsertPerformsInsert, opCtx(), "hangBeforeUpsertPerformsInsert"); } - writeConflictRetry(opCtx(), "upsert", collection()->ns().ns(), [&] { + writeConflictRetry(opCtx(), "upsert", collection()->ns(), [&] { WriteUnitOfWork wunit(opCtx()); InsertStatement insertStmt(_params.request->getStmtIds(), newDocument); diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp index 1aea4755bc5..bcc9f3cde2e 100644 --- a/src/mongo/db/fle_crud_mongod.cpp +++ b/src/mongo/db/fle_crud_mongod.cpp @@ -491,7 +491,7 @@ std::vector<std::vector<FLEEdgeCountInfo>> getTagsFromStorage( auto opStr = "getTagsFromStorage"_sd; return writeConflictRetry( - opCtx, opStr, nsOrUUID.toString(), [&]() -> std::vector<std::vector<FLEEdgeCountInfo>> { + opCtx, opStr, nsOrUUID, [&]() -> std::vector<std::vector<FLEEdgeCountInfo>> { AutoGetCollectionForReadMaybeLockFree autoColl(opCtx, nsOrUUID); const auto& collection = autoColl.getCollection(); diff --git a/src/mongo/db/global_index.cpp b/src/mongo/db/global_index.cpp index a31e7e3889c..e788cf4f640 100644 --- a/src/mongo/db/global_index.cpp +++ b/src/mongo/db/global_index.cpp @@ -120,7 +120,7 @@ void createContainer(OperationContext* opCtx, const UUID& indexUUID) { LOGV2(6789200, "Create global index container", "indexUUID"_attr = indexUUID); // Create the container. - return writeConflictRetry(opCtx, "createGlobalIndexContainer", nss.ns(), [&]() { + return writeConflictRetry(opCtx, "createGlobalIndexContainer", nss, [&]() { const auto indexKeySpec = BSON("v" << 2 << "name" << kContainerIndexKeyFieldName.toString() + "_1" << "key" << BSON(kContainerIndexKeyFieldName << 1) << "unique" << true); @@ -188,7 +188,7 @@ void dropContainer(OperationContext* opCtx, const UUID& indexUUID) { LOGV2(6789300, "Drop global index container", "indexUUID"_attr = indexUUID); // Drop the container. - return writeConflictRetry(opCtx, "dropGlobalIndexContainer", nss.ns(), [&]() { + return writeConflictRetry(opCtx, "dropGlobalIndexContainer", nss, [&]() { AutoGetCollection autoColl(opCtx, nss, MODE_X); if (!autoColl) { // Idempotent command, return OK if the collection is non-existing. @@ -230,7 +230,7 @@ void insertKey(OperationContext* opCtx, const auto indexEntry = buildIndexEntry(key, docKey); // Insert the index entry. - writeConflictRetry(opCtx, "insertGlobalIndexKey", ns.toString(), [&] { + writeConflictRetry(opCtx, "insertGlobalIndexKey", ns, [&] { WriteUnitOfWork wuow(opCtx); AutoGetCollection autoColl(opCtx, ns, MODE_IX); auto& container = autoColl.getCollection(); @@ -300,7 +300,7 @@ void deleteKey(OperationContext* opCtx, const auto ns = NamespaceString::makeGlobalIndexNSS(indexUUID); // Find and delete the index entry. - writeConflictRetry(opCtx, "deleteGlobalIndexKey", ns.toString(), [&] { + writeConflictRetry(opCtx, "deleteGlobalIndexKey", ns, [&] { WriteUnitOfWork wuow(opCtx); const auto coll = acquireCollection( diff --git a/src/mongo/db/index/bulk_builder_common.h b/src/mongo/db/index/bulk_builder_common.h index 925fff50b0a..624e7dc1725 100644 --- a/src/mongo/db/index/bulk_builder_common.h +++ b/src/mongo/db/index/bulk_builder_common.h @@ -142,7 +142,7 @@ public: try { - writeConflictRetry(opCtx, "addingKey", _ns.ns(), [&] { + writeConflictRetry(opCtx, "addingKey", _ns, [&] { WriteUnitOfWork wunit(opCtx); static_cast<T*>(this)->insertKey(builder, data); wunit.commit(); diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index c80748bc9df..a8704fd9239 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -270,8 +270,7 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, // Apply batches of side writes until the last record in the table is seen. while (!atEof) { - auto swAtEof = - writeConflictRetry(opCtx, "index build drain", coll->ns().ns(), applySingleBatch); + auto swAtEof = writeConflictRetry(opCtx, "index build drain", coll->ns(), applySingleBatch); if (!swAtEof.isOK()) { return swAtEof.getStatus(); } diff --git a/src/mongo/db/index/skipped_record_tracker.cpp b/src/mongo/db/index/skipped_record_tracker.cpp index e2ab8991a51..d62887f7eb8 100644 --- a/src/mongo/db/index/skipped_record_tracker.cpp +++ b/src/mongo/db/index/skipped_record_tracker.cpp @@ -83,10 +83,7 @@ void SkippedRecordTracker::record(OperationContext* opCtx, const RecordId& recor } writeConflictRetry( - opCtx, - "recordSkippedRecordTracker", - NamespaceString::kIndexBuildEntryNamespace.ns(), - [&]() { + opCtx, "recordSkippedRecordTracker", NamespaceString::kIndexBuildEntryNamespace, [&]() { WriteUnitOfWork wuow(opCtx); uassertStatusOK( _skippedRecordsTable->rs() diff --git a/src/mongo/db/index_build_entry_helpers.cpp b/src/mongo/db/index_build_entry_helpers.cpp index 42f05b454c1..c5e832c2d4c 100644 --- a/src/mongo/db/index_build_entry_helpers.cpp +++ b/src/mongo/db/index_build_entry_helpers.cpp @@ -54,11 +54,10 @@ namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeGettingIndexBuildEntry); Status upsert(OperationContext* opCtx, const IndexBuildEntry& indexBuildEntry) { - return writeConflictRetry( opCtx, "upsertIndexBuildEntry", - NamespaceString::kIndexBuildEntryNamespace.ns(), + NamespaceString::kIndexBuildEntryNamespace, [&]() -> Status { auto collection = acquireCollection(opCtx, @@ -119,7 +118,7 @@ Status upsert(OperationContext* opCtx, const BSONObj& filter, const BSONObj& upd return writeConflictRetry( opCtx, "upsertIndexBuildEntry", - NamespaceString::kIndexBuildEntryNamespace.ns(), + NamespaceString::kIndexBuildEntryNamespace, [&]() -> Status { auto collection = acquireCollection(opCtx, @@ -151,7 +150,7 @@ Status update(OperationContext* opCtx, const BSONObj& filter, const BSONObj& upd return writeConflictRetry( opCtx, "updateIndexBuildEntry", - NamespaceString::kIndexBuildEntryNamespace.ns(), + NamespaceString::kIndexBuildEntryNamespace, [&]() -> Status { ; auto collection = @@ -188,7 +187,7 @@ void ensureIndexBuildEntriesNamespaceExists(OperationContext* opCtx) { writeConflictRetry( opCtx, "createIndexBuildCollection", - NamespaceString::kIndexBuildEntryNamespace.ns(), + NamespaceString::kIndexBuildEntryNamespace, [&]() -> void { AutoGetDb autoDb(opCtx, NamespaceString::kIndexBuildEntryNamespace.dbName(), MODE_IX); auto db = autoDb.ensureDbExists(opCtx); @@ -236,10 +235,7 @@ Status persistIndexCommitQuorum(OperationContext* opCtx, const IndexBuildEntry& Status addIndexBuildEntry(OperationContext* opCtx, const IndexBuildEntry& indexBuildEntry) { return writeConflictRetry( - opCtx, - "addIndexBuildEntry", - NamespaceString::kIndexBuildEntryNamespace.ns(), - [&]() -> Status { + opCtx, "addIndexBuildEntry", NamespaceString::kIndexBuildEntryNamespace, [&]() -> Status { AutoGetCollection collection( opCtx, NamespaceString::kIndexBuildEntryNamespace, MODE_IX); if (!collection) { @@ -274,7 +270,7 @@ Status removeIndexBuildEntry(OperationContext* opCtx, return writeConflictRetry( opCtx, "removeIndexBuildEntry", - NamespaceString::kIndexBuildEntryNamespace.ns(), + NamespaceString::kIndexBuildEntryNamespace, [&]() -> Status { if (!collection) { str::stream ss; @@ -319,7 +315,7 @@ StatusWith<IndexBuildEntry> getIndexBuildEntry(OperationContext* opCtx, UUID ind // This operation does not perform any writes, but the index building code is sensitive to // exceptions and we must protect it from unanticipated write conflicts from reads. bool foundObj = writeConflictRetry( - opCtx, "getIndexBuildEntry", NamespaceString::kIndexBuildEntryNamespace.ns(), [&]() { + opCtx, "getIndexBuildEntry", NamespaceString::kIndexBuildEntryNamespace, [&]() { return Helpers::findOne( opCtx, collection.getCollection(), BSON("_id" << indexBuildUUID), obj); }); diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index d11df7344d9..38b1da5703f 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -372,10 +372,9 @@ repl::OpTime getLatestOplogOpTime(OperationContext* opCtx) { BSONObj oplogEntryBSON; // This operation does not perform any writes, but the index building code is sensitive to // exceptions and we must protect it from unanticipated write conflicts from reads. - writeConflictRetry( - opCtx, "getLatestOplogOpTime", NamespaceString::kRsOplogNamespace.ns(), [&]() { - invariant(Helpers::getLast(opCtx, NamespaceString::kRsOplogNamespace, oplogEntryBSON)); - }); + writeConflictRetry(opCtx, "getLatestOplogOpTime", NamespaceString::kRsOplogNamespace, [&]() { + invariant(Helpers::getLast(opCtx, NamespaceString::kRsOplogNamespace, oplogEntryBSON)); + }); auto optime = repl::OpTime::parseFromOplogEntry(oplogEntryBSON); invariant(optime.isOK(), @@ -1079,7 +1078,7 @@ void IndexBuildsCoordinator::applyStartIndexBuild(OperationContext* opCtx, // proceeding with building them. if (indexBuildOptions.applicationMode == ApplicationMode::kInitialSync) { auto dbAndUUID = NamespaceStringOrUUID(nss.db().toString(), collUUID); - writeConflictRetry(opCtx, "IndexBuildsCoordinator::applyStartIndexBuild", nss.ns(), [&] { + writeConflictRetry(opCtx, "IndexBuildsCoordinator::applyStartIndexBuild", nss, [&] { WriteUnitOfWork wuow(opCtx); AutoGetCollection coll(opCtx, dbAndUUID, MODE_X); @@ -2284,7 +2283,7 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild(OperationContext* opCtx, // the catalog update when it uses the timestamp from the startIndexBuild, rather than // the commitIndexBuild, oplog entry. writeConflictRetry( - opCtx, "IndexBuildsCoordinator::_filterSpecsAndRegisterBuild", nss.ns(), [&] { + opCtx, "IndexBuildsCoordinator::_filterSpecsAndRegisterBuild", nss, [&] { WriteUnitOfWork wuow(opCtx); createIndexesOnEmptyCollection(opCtx, collection, filteredSpecs, false); wuow.commit(); diff --git a/src/mongo/db/introspect.cpp b/src/mongo/db/introspect.cpp index 81e6b1aaca0..aa52b86af04 100644 --- a/src/mongo/db/introspect.cpp +++ b/src/mongo/db/introspect.cpp @@ -153,7 +153,7 @@ Status createProfileCollection(OperationContext* opCtx, Database* db) { // Checking the collection exists must also be done in the WCE retry loop. Only retrying // collection creation would endlessly throw errors because the collection exists: must check // and see the collection exists in order to break free. - return writeConflictRetry(opCtx, "createProfileCollection", dbProfilingNS.ns(), [&] { + return writeConflictRetry(opCtx, "createProfileCollection", dbProfilingNS, [&] { const Collection* collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, dbProfilingNS); if (collection) { diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 6f4bf660c26..e3ed6a0159c 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -1737,7 +1737,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, invariant(!opCtx->lockState()->hasMaxLockTimeout()); writeConflictRetry( - opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace, [&] { // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); @@ -2059,107 +2059,106 @@ void OpObserverImpl::onTransactionPrepare( invariant(reservedSlots.size() >= statements.size()); TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry(opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace, [&] { + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); + + WriteUnitOfWork wuow(opCtx); + // It is possible that the transaction resulted in no changes, In that case, we + // should not write any operations other than the prepare oplog entry. + if (!statements.empty()) { + // Storage transaction commit is the last place inside a transaction that can + // throw an exception. In order to safely allow exceptions to be thrown at that + // point, this function must be called from an outer WriteUnitOfWork in order to + // be rolled back upon reaching the exception. + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); - WriteUnitOfWork wuow(opCtx); - // It is possible that the transaction resulted in no changes, In that case, we - // should not write any operations other than the prepare oplog entry. - if (!statements.empty()) { - // Storage transaction commit is the last place inside a transaction that can - // throw an exception. In order to safely allow exceptions to be thrown at that - // point, this function must be called from an outer WriteUnitOfWork in order to - // be rolled back upon reaching the exception. - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - - // Writes to the oplog only require a Global intent lock. Guaranteed by - // OplogSlotReserver. - invariant(opCtx->lockState()->isWriteLocked()); - - if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) { - // Partial transactions create/reserve multiple oplog entries in the same - // WriteUnitOfWork. Because of this, such transactions will set multiple - // timestamps, violating the multi timestamp constraint. It's safe to ignore - // the multi timestamp constraints here as additional rollback logic is in - // place for this case. See SERVER-48771. - opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints(); - } - - // This is set for every oplog entry, except for the last one, in the applyOps - // chain of an unprepared multi-doc transaction. - // For a single prepare oplog entry, choose the last oplog slot for the first - // optime of the transaction. The first optime corresponds to the 'startOpTime' - // field in SessionTxnRecord that is persisted in config.transactions. - // See SERVER-40678. - auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U - ? reservedSlots.back() - : reservedSlots.front(); - - auto logApplyOpsForPreparedTransaction = - [opCtx, oplogWriter = _oplogWriter.get(), startOpTime]( - repl::MutableOplogEntry* oplogEntry, - bool firstOp, - bool lastOp, - std::vector<StmtId> stmtIdsWritten) { - return logApplyOps(opCtx, - oplogEntry, - /*txnState=*/ - (lastOp ? DurableTxnStateEnum::kPrepared - : DurableTxnStateEnum::kInProgress), - startOpTime, - std::move(stmtIdsWritten), - /*updateTxnTable=*/(firstOp || lastOp), - oplogWriter); - }; - - // We had reserved enough oplog slots for the worst case where each operation - // produced one oplog entry. When operations are smaller and can be packed, we - // will waste the extra slots. The implicit prepare oplog entry will still use - // the last reserved slot, because the transaction participant has already used - // that as the prepare time. - boost::optional<repl::ReplOperation::ImageBundle> imageToWrite; - invariant(applyOpsOperationAssignment.prepare); - (void)transactionOperations.logOplogEntries(reservedSlots, - applyOpsOperationAssignment, - wallClockTime, - logApplyOpsForPreparedTransaction, - &imageToWrite); - if (imageToWrite) { - writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite); - } - } else { - // Log an empty 'prepare' oplog entry. - // We need to have at least one reserved slot. - invariant(reservedSlots.size() > 0); - BSONObjBuilder applyOpsBuilder; - BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); - opsArray.done(); - applyOpsBuilder.append("prepare", true); - - auto oplogSlot = reservedSlots.front(); - MutableOplogEntry oplogEntry; - oplogEntry.setOpType(repl::OpTypeEnum::kCommand); - oplogEntry.setNss(NamespaceString::kAdminCommandNamespace); - oplogEntry.setOpTime(oplogSlot); - oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); - oplogEntry.setObject(applyOpsBuilder.done()); - oplogEntry.setWallClockTime(wallClockTime); - - // TODO SERVER-69286: set the top-level tenantId here - - logApplyOps(opCtx, - &oplogEntry, - DurableTxnStateEnum::kPrepared, - /*startOpTime=*/oplogSlot, - /*stmtIdsWritten=*/{}, - /*updateTxnTable=*/true, - _oplogWriter.get()); + if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) { + // Partial transactions create/reserve multiple oplog entries in the same + // WriteUnitOfWork. Because of this, such transactions will set multiple + // timestamps, violating the multi timestamp constraint. It's safe to ignore + // the multi timestamp constraints here as additional rollback logic is in + // place for this case. See SERVER-48771. + opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints(); + } + + // This is set for every oplog entry, except for the last one, in the applyOps + // chain of an unprepared multi-doc transaction. + // For a single prepare oplog entry, choose the last oplog slot for the first + // optime of the transaction. The first optime corresponds to the 'startOpTime' + // field in SessionTxnRecord that is persisted in config.transactions. + // See SERVER-40678. + auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U + ? reservedSlots.back() + : reservedSlots.front(); + + auto logApplyOpsForPreparedTransaction = + [opCtx, oplogWriter = _oplogWriter.get(), startOpTime]( + repl::MutableOplogEntry* oplogEntry, + bool firstOp, + bool lastOp, + std::vector<StmtId> stmtIdsWritten) { + return logApplyOps(opCtx, + oplogEntry, + /*txnState=*/ + (lastOp ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kInProgress), + startOpTime, + std::move(stmtIdsWritten), + /*updateTxnTable=*/(firstOp || lastOp), + oplogWriter); + }; + + // We had reserved enough oplog slots for the worst case where each operation + // produced one oplog entry. When operations are smaller and can be packed, we + // will waste the extra slots. The implicit prepare oplog entry will still use + // the last reserved slot, because the transaction participant has already used + // that as the prepare time. + boost::optional<repl::ReplOperation::ImageBundle> imageToWrite; + invariant(applyOpsOperationAssignment.prepare); + (void)transactionOperations.logOplogEntries(reservedSlots, + applyOpsOperationAssignment, + wallClockTime, + logApplyOpsForPreparedTransaction, + &imageToWrite); + if (imageToWrite) { + writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite); } - wuow.commit(); - }); + } else { + // Log an empty 'prepare' oplog entry. + // We need to have at least one reserved slot. + invariant(reservedSlots.size() > 0); + BSONObjBuilder applyOpsBuilder; + BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); + opsArray.done(); + applyOpsBuilder.append("prepare", true); + + auto oplogSlot = reservedSlots.front(); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(NamespaceString::kAdminCommandNamespace); + oplogEntry.setOpTime(oplogSlot); + oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); + oplogEntry.setObject(applyOpsBuilder.done()); + oplogEntry.setWallClockTime(wallClockTime); + + // TODO SERVER-69286: set the top-level tenantId here + + logApplyOps(opCtx, + &oplogEntry, + DurableTxnStateEnum::kPrepared, + /*startOpTime=*/oplogSlot, + /*stmtIdsWritten=*/{}, + /*updateTxnTable=*/true, + _oplogWriter.get()); + } + wuow.commit(); + }); } shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime); diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index 17fb2ddc2a5..016ad012007 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -203,7 +203,7 @@ protected: void reset(OperationContext* opCtx, NamespaceString nss, boost::optional<UUID> uuid = boost::none) const { - writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] { + writeConflictRetry(opCtx, "deleteAll", nss, [&] { opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); opCtx->recoveryUnit()->abandonSnapshot(); diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index e8e04a57fa8..faa6f6c00cd 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -67,7 +67,7 @@ UpdateResult update(OperationContext* opCtx, // The update stage does not create its own collection. As such, if the update is // an upsert, create the collection that the update stage inserts into beforehand. - writeConflictRetry(opCtx, "createCollection", nsString.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nsString, [&] { if (!coll.exists() && request.isUpsert()) { const bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nsString); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 9e00b4dab48..7979eb7d3e7 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -269,7 +269,7 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) { } void makeCollection(OperationContext* opCtx, const NamespaceString& ns) { - writeConflictRetry(opCtx, "implicit collection creation", ns.ns(), [&opCtx, &ns] { + writeConflictRetry(opCtx, "implicit collection creation", ns, [&opCtx, &ns] { AutoGetDb autoDb(opCtx, ns.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, ns, MODE_IX); @@ -627,7 +627,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForInsert( opCtx->getWriteConcern()); try { - writeConflictRetry(opCtx, "insert", nss.ns(), [&] { + writeConflictRetry(opCtx, "insert", nss, [&] { try { if (!collection) acquireCollection(); diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp index 4ffb3fccf1e..6c1c20f156c 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp @@ -181,7 +181,7 @@ protected: NamespaceString arbitraryNss = NamespaceString::createNamespaceString_forTest("test", "coll"); - writeConflictRetry(opCtx, "createCollection", arbitraryNss.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", arbitraryNss, [&] { WriteUnitOfWork wunit(opCtx); AutoGetCollection collRaii(opCtx, arbitraryNss, MODE_X); invariant(!collRaii); diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index d90d2049c7b..576f6015eba 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -836,7 +836,7 @@ void CommonMongodProcessInterface::writeRecordsToRecordStore( const std::vector<Timestamp>& ts) const { tassert(5643012, "Attempted to write to record store with nullptr", records); assertIgnorePrepareConflictsBehavior(expCtx); - writeConflictRetry(expCtx->opCtx, "MPI::writeRecordsToRecordStore", expCtx->ns.ns(), [&] { + writeConflictRetry(expCtx->opCtx, "MPI::writeRecordsToRecordStore", expCtx->ns, [&] { Lock::GlobalLock lk(expCtx->opCtx, MODE_IS); WriteUnitOfWork wuow(expCtx->opCtx); auto writeResult = rs->insertRecords(expCtx->opCtx, records, ts); @@ -866,7 +866,7 @@ Document CommonMongodProcessInterface::readRecordFromRecordStore( void CommonMongodProcessInterface::deleteRecordFromRecordStore( const boost::intrusive_ptr<ExpressionContext>& expCtx, RecordStore* rs, RecordId rID) const { assertIgnorePrepareConflictsBehavior(expCtx); - writeConflictRetry(expCtx->opCtx, "MPI::deleteFromRecordStore", expCtx->ns.ns(), [&] { + writeConflictRetry(expCtx->opCtx, "MPI::deleteFromRecordStore", expCtx->ns, [&] { Lock::GlobalLock lk(expCtx->opCtx, MODE_IS); WriteUnitOfWork wuow(expCtx->opCtx); rs->deleteRecord(expCtx->opCtx, rID); @@ -877,7 +877,7 @@ void CommonMongodProcessInterface::deleteRecordFromRecordStore( void CommonMongodProcessInterface::truncateRecordStore( const boost::intrusive_ptr<ExpressionContext>& expCtx, RecordStore* rs) const { assertIgnorePrepareConflictsBehavior(expCtx); - writeConflictRetry(expCtx->opCtx, "MPI::truncateRecordStore", expCtx->ns.ns(), [&] { + writeConflictRetry(expCtx->opCtx, "MPI::truncateRecordStore", expCtx->ns, [&] { Lock::GlobalLock lk(expCtx->opCtx, MODE_IS); WriteUnitOfWork wuow(expCtx->opCtx); auto status = rs->truncate(expCtx->opCtx); diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index f153109e0e6..ff7a23aec23 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -170,7 +170,7 @@ void NonShardServerProcessInterface::createIndexesOnEmptyCollection( AutoGetCollection autoColl(opCtx, ns, MODE_X); CollectionWriter collection(opCtx, autoColl); writeConflictRetry( - opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] { + opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns, [&] { uassert(ErrorCodes::DatabaseDropPending, str::stream() << "The database is in the process of being dropped " << ns.dbName().toStringForErrorMsg(), diff --git a/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp b/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp index 7599842de57..21f40b353c7 100644 --- a/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp +++ b/src/mongo/db/pipeline/window_function/spillable_cache_test.cpp @@ -56,7 +56,7 @@ public: std::vector<Record>* records, const std::vector<Timestamp>& ts) const override { - writeConflictRetry(expCtx->opCtx, "MPI::writeRecordsToRecordStore", expCtx->ns.ns(), [&] { + writeConflictRetry(expCtx->opCtx, "MPI::writeRecordsToRecordStore", expCtx->ns, [&] { AutoGetCollection autoColl(expCtx->opCtx, expCtx->ns, MODE_IX); WriteUnitOfWork wuow(expCtx->opCtx); auto writeResult = rs->insertRecords(expCtx->opCtx, records, ts); diff --git a/src/mongo/db/query/wildcard_multikey_paths.cpp b/src/mongo/db/query/wildcard_multikey_paths.cpp index fe87babee8d..4232bc5177f 100644 --- a/src/mongo/db/query/wildcard_multikey_paths.cpp +++ b/src/mongo/db/query/wildcard_multikey_paths.cpp @@ -142,7 +142,7 @@ static std::set<FieldRef> getWildcardMultikeyPathSetHelper(const WildcardAccessM const IndexBounds& indexBounds, MultikeyMetadataAccessStats* stats) { return writeConflictRetry( - opCtx, "wildcard multikey path retrieval", "", [&]() -> std::set<FieldRef> { + opCtx, "wildcard multikey path retrieval", NamespaceString(), [&]() -> std::set<FieldRef> { stats->numSeeks = 0; stats->keysExamined = 0; auto cursor = wam->newCursor(opCtx); @@ -355,7 +355,7 @@ static std::pair<BSONObj, BSONObj> buildMetadataKeyRange(const BSONObj& keyPatte std::set<FieldRef> getWildcardMultikeyPathSet(const WildcardAccessMethod* wam, OperationContext* opCtx, MultikeyMetadataAccessStats* stats) { - return writeConflictRetry(opCtx, "wildcard multikey path retrieval", "", [&]() { + return writeConflictRetry(opCtx, "wildcard multikey path retrieval", NamespaceString(), [&]() { tassert(7354611, "stats must be non-null", stats); stats->numSeeks = 0; stats->keysExamined = 0; diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp index 27fd824a7d5..613d0609d02 100644 --- a/src/mongo/db/read_concern_mongod.cpp +++ b/src/mongo/db/read_concern_mongod.cpp @@ -489,10 +489,7 @@ Status waitForLinearizableReadConcernImpl(OperationContext* opCtx, } writeConflictRetry( - opCtx, - "waitForLinearizableReadConcern", - NamespaceString::kRsOplogNamespace.ns(), - [&opCtx] { + opCtx, "waitForLinearizableReadConcern", NamespaceString::kRsOplogNamespace, [&opCtx] { WriteUnitOfWork uow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( opCtx, diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 775e4dcb0b1..374ebb27ae9 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -99,7 +99,7 @@ Status _applyOps(OperationContext* opCtx, status = writeConflictRetry( opCtx, "applyOps", - nss.ns(), + nss, [opCtx, nss, opObj, opType, alwaysUpsert, oplogApplicationMode, &info, &dbName] { BSONObjBuilder builder; // Remove 'hash' field if it is set. A bit slow as it rebuilds the object. diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 0cb2c057f8f..544642af7f6 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -969,7 +969,7 @@ OpTime BackgroundSync::_readLastAppliedOpTime(OperationContext* opCtx) { BSONObj oplogEntry; try { bool success = writeConflictRetry( - opCtx, "readLastAppliedOpTime", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "readLastAppliedOpTime", NamespaceString::kRsOplogNamespace, [&] { return Helpers::getLast(opCtx, NamespaceString::kRsOplogNamespace, oplogEntry); }); diff --git a/src/mongo/db/repl/change_stream_oplog_notification.cpp b/src/mongo/db/repl/change_stream_oplog_notification.cpp index 518c4560785..1c8fb6a78c2 100644 --- a/src/mongo/db/repl/change_stream_oplog_notification.cpp +++ b/src/mongo/db/repl/change_stream_oplog_notification.cpp @@ -43,7 +43,7 @@ namespace { void insertOplogEntry(OperationContext* opCtx, repl::MutableOplogEntry&& oplogEntry, StringData opStr) { - writeConflictRetry(opCtx, opStr, NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry(opCtx, opStr, NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); WriteUnitOfWork wunit(opCtx); const auto& oplogOpTime = repl::logOp(opCtx, &oplogEntry); diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index c3ff592161c..f0fbf19494f 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -83,7 +83,7 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex return writeConflictRetry( _opCtx.get(), "CollectionBulkLoader::init", - _acquisition.nss().ns(), + _acquisition.nss(), [&secondaryIndexSpecs, this] { WriteUnitOfWork wuow(_opCtx.get()); // All writes in CollectionBulkLoaderImpl should be unreplicated. @@ -137,7 +137,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( while (iter != end) { std::vector<RecordId> locs; Status status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsUncapped", _nss.ns(), [&] { + _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsUncapped", _nss, [&] { WriteUnitOfWork wunit(_opCtx.get()); auto insertIter = iter; int bytesInBlock = 0; @@ -170,7 +170,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( // Inserts index entries into the external sorter. This will not update pre-existing // indexes. Wrap this in a WUOW since the index entry insertion may modify the durable // record store which can throw a write conflict exception. - status = writeConflictRetry(_opCtx.get(), "_addDocumentToIndexBlocks", _nss.ns(), [&] { + status = writeConflictRetry(_opCtx.get(), "_addDocumentToIndexBlocks", _nss, [&] { WriteUnitOfWork wunit(_opCtx.get()); for (size_t index = 0; index < locs.size(); ++index) { status = _addDocumentToIndexBlocks(*iter++, locs.at(index)); @@ -195,7 +195,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( for (auto iter = begin; iter != end; ++iter) { const auto& doc = *iter; Status status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsCapped", _nss.ns(), [&] { + _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsCapped", _nss, [&] { WriteUnitOfWork wunit(_opCtx.get()); // For capped collections, we use regular insertDocument, which // will update pre-existing indexes. @@ -247,8 +247,8 @@ Status CollectionBulkLoaderImpl::commit() { invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), _acquisition.getCollectionPtr())); - status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { + status = + writeConflictRetry(_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss, [this] { WriteUnitOfWork wunit(_opCtx.get()); CollectionWriter collWriter(_opCtx.get(), &_acquisition); auto status = _secondaryIndexesBlock->commit( @@ -272,7 +272,7 @@ Status CollectionBulkLoaderImpl::commit() { auto status = _idIndexBlock->dumpInsertsFromBulk( _opCtx.get(), _acquisition.getCollectionPtr(), [&](const RecordId& rid) { writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &rid] { + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss, [this, &rid] { WriteUnitOfWork wunit(_opCtx.get()); auto doc = _acquisition.getCollectionPtr()->docFor(_opCtx.get(), rid); @@ -324,8 +324,8 @@ Status CollectionBulkLoaderImpl::commit() { // Commit the _id index, there won't be any documents with duplicate _ids as they were // deleted prior to this. - status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { + status = + writeConflictRetry(_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss, [this] { WriteUnitOfWork wunit(_opCtx.get()); CollectionWriter collWriter(_opCtx.get(), &_acquisition); auto status = diff --git a/src/mongo/db/repl/noop_writer.cpp b/src/mongo/db/repl/noop_writer.cpp index f2898454371..98755249180 100644 --- a/src/mongo/db/repl/noop_writer.cpp +++ b/src/mongo/db/repl/noop_writer.cpp @@ -195,13 +195,12 @@ void NoopWriter::_writeNoop(OperationContext* opCtx) { "Writing noop to oplog as there has been no writes to this replica set " "within write interval", "writeInterval"_attr = _writeInterval); - writeConflictRetry( - opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&opCtx] { - WriteUnitOfWork uow(opCtx); - opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx, - kMsgObj); - uow.commit(); - }); + writeConflictRetry(opCtx, "writeNoop", NamespaceString::kRsOplogNamespace, [&opCtx] { + WriteUnitOfWork uow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage(opCtx, + kMsgObj); + uow.commit(); + }); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 24a8d4d8b25..908e64d878c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -733,7 +733,7 @@ void createOplog(OperationContext* opCtx, options.cappedSize = sz; options.autoIndexId = CollectionOptions::NO; - writeConflictRetry(opCtx, "createCollection", oplogCollectionName.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] { WriteUnitOfWork uow(opCtx); invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options)); acquireOplogCollectionForLogging(opCtx); @@ -1447,7 +1447,7 @@ Status applyOperation_inlock(OperationContext* opCtx, "mode should be in initialSync or recovering", mode == OplogApplication::Mode::kInitialSync || OplogApplication::inRecovering(mode)); - writeConflictRetry(opCtx, "applyOps_imageInvalidation", op.getNss().toString(), [&] { + writeConflictRetry(opCtx, "applyOps_imageInvalidation", op.getNss(), [&] { WriteUnitOfWork wuow(opCtx); bool upsertConfigImage = true; writeToImageCollection(opCtx, @@ -1739,8 +1739,7 @@ Status applyOperation_inlock(OperationContext* opCtx, request.setUpsert(); request.setFromOplogApplication(true); - const StringData ns = op.getNss().ns(); - writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] { + writeConflictRetry(opCtx, "applyOps_upsert", op.getNss(), [&] { WriteUnitOfWork wuow(opCtx); // If `haveWrappingWriteUnitOfWork` is true, do not timestamp the write. if (assignOperationTimestamp && timestamp != Timestamp::min()) { @@ -1825,7 +1824,6 @@ Status applyOperation_inlock(OperationContext* opCtx, timestamp = op.getTimestamp(); } - const StringData ns = op.getNss().ns(); // Operations that were part of a retryable findAndModify have two formats for // replicating pre/post images. The classic format has primaries writing explicit noop // oplog entries that contain the necessary details for reconstructed a response to a @@ -1856,7 +1854,7 @@ Status applyOperation_inlock(OperationContext* opCtx, // to insert a document. We only have to make sure we didn't race with an insert that // won, but with an earlier `ts`. bool upsertConfigImage = true; - auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] { + auto status = writeConflictRetry(opCtx, "applyOps_update", op.getNss(), [&] { WriteUnitOfWork wuow(opCtx); if (timestamp != Timestamp::min()) { uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); @@ -2015,9 +2013,8 @@ Status applyOperation_inlock(OperationContext* opCtx, // Determine if a change stream pre-image has to be recorded for the oplog entry. const bool recordChangeStreamPreImage = shouldRecordChangeStreamPreImage(); - const StringData ns = op.getNss().ns(); bool upsertConfigImage = true; - writeConflictRetry(opCtx, "applyOps_delete", ns, [&] { + writeConflictRetry(opCtx, "applyOps_delete", op.getNss(), [&] { WriteUnitOfWork wuow(opCtx); if (timestamp != Timestamp::min()) { uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); @@ -2138,7 +2135,7 @@ Status applyOperation_inlock(OperationContext* opCtx, timestamp = op.getTimestamp(); } - writeConflictRetry(opCtx, "applyOps_insertGlobalIndexKey", collection->ns().ns(), [&] { + writeConflictRetry(opCtx, "applyOps_insertGlobalIndexKey", collection->ns(), [&] { WriteUnitOfWork wuow(opCtx); if (timestamp != Timestamp::min()) { uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); @@ -2162,7 +2159,7 @@ Status applyOperation_inlock(OperationContext* opCtx, timestamp = op.getTimestamp(); } - writeConflictRetry(opCtx, "applyOps_deleteGlobalIndexKey", collection->ns().ns(), [&] { + writeConflictRetry(opCtx, "applyOps_deleteGlobalIndexKey", collection->ns(), [&] { WriteUnitOfWork wuow(opCtx); if (timestamp != Timestamp::min()) { uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index 01fefaef308..4116eb8e15b 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -507,7 +507,7 @@ CollectionOptions createRecordChangeStreamPreAndPostImagesCollectionOptions() { void createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { - writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nss, [&] { Lock::DBLock dbLk(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLk(opCtx, nss, MODE_X); diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index b1364cade01..7ea9ccf5538 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -406,111 +406,110 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( incrementOpsAppliedStats(); return Status::OK(); } else if (DurableOplogEntry::isCrudOpType(opType)) { - auto status = - writeConflictRetry(opCtx, "applyOplogEntryOrGroupedInserts_CRUD", nss.ns(), [&] { - // Need to throw instead of returning a status for it to be properly ignored. + auto status = writeConflictRetry(opCtx, "applyOplogEntryOrGroupedInserts_CRUD", nss, [&] { + // Need to throw instead of returning a status for it to be properly ignored. + try { + boost::optional<ScopedCollectionAcquisition> coll; + Database* db = nullptr; + + // If the collection UUID does not resolve, acquire the collection using the + // namespace. This is so we reach `applyOperation_inlock` below and invalidate + // the preimage / postimage for the op if applicable. + + // TODO SERVER-41371 / SERVER-73661 this code is difficult to maintain and + // needs to be done everywhere this situation is possible. We should try + // to consolidate this into applyOperation_inlock. try { - boost::optional<ScopedCollectionAcquisition> coll; - Database* db = nullptr; - - // If the collection UUID does not resolve, acquire the collection using the - // namespace. This is so we reach `applyOperation_inlock` below and invalidate - // the preimage / postimage for the op if applicable. - - // TODO SERVER-41371 / SERVER-73661 this code is difficult to maintain and - // needs to be done everywhere this situation is possible. We should try - // to consolidate this into applyOperation_inlock. - try { + coll.emplace( + acquireCollection(opCtx, + {getNsOrUUID(nss, *op), + AcquisitionPrerequisites::kPretendUnsharded, + repl::ReadConcernArgs::get(opCtx), + AcquisitionPrerequisites::kWrite}, + fixLockModeForSystemDotViewsChanges(nss, MODE_IX))); + + AutoGetDb autoDb(opCtx, coll->nss().dbName(), MODE_IX); + db = autoDb.getDb(); + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { + if (!isDataConsistent) { coll.emplace( acquireCollection(opCtx, - {getNsOrUUID(nss, *op), + {nss, AcquisitionPrerequisites::kPretendUnsharded, repl::ReadConcernArgs::get(opCtx), AcquisitionPrerequisites::kWrite}, fixLockModeForSystemDotViewsChanges(nss, MODE_IX))); AutoGetDb autoDb(opCtx, coll->nss().dbName(), MODE_IX); - db = autoDb.getDb(); - } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - if (!isDataConsistent) { - coll.emplace(acquireCollection( - opCtx, - {nss, - AcquisitionPrerequisites::kPretendUnsharded, - repl::ReadConcernArgs::get(opCtx), - AcquisitionPrerequisites::kWrite}, - fixLockModeForSystemDotViewsChanges(nss, MODE_IX))); - - AutoGetDb autoDb(opCtx, coll->nss().dbName(), MODE_IX); - db = autoDb.ensureDbExists(opCtx); - } else { - throw ex; - } + db = autoDb.ensureDbExists(opCtx); + } else { + throw ex; } + } - invariant(coll); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "missing database (" - << nss.dbName().toStringForErrorMsg() << ")", - db); - OldClientContext ctx(opCtx, coll->nss(), db); - - // We convert updates to upserts in secondary mode when the - // oplogApplicationEnforcesSteadyStateConstraints parameter is false, to avoid - // failing on the constraint that updates in steady state mode always update - // an existing document. - // - // In initial sync and recovery modes we always ignore errors about missing - // documents on update, so there is no reason to convert the updates to upsert. - - bool shouldAlwaysUpsert = !oplogApplicationEnforcesSteadyStateConstraints && - oplogApplicationMode == OplogApplication::Mode::kSecondary; - Status status = applyOperation_inlock(opCtx, - *coll, - entryOrGroupedInserts, - shouldAlwaysUpsert, - oplogApplicationMode, - isDataConsistent, - incrementOpsAppliedStats); - if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { - throwWriteConflictException( - str::stream() << "WriteConflict caught when applying operation." - << " Original error: " << status.reason()); - } - return status; - } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // This can happen in initial sync or recovery modes (when a delete of the - // namespace appears later in the oplog), but we will ignore it in the caller. - // - // When we're not enforcing steady-state constraints, the error is ignored - // only for deletes, on the grounds that deleting from a non-existent collection - // is a no-op. - if (opType == OpTypeEnum::kDelete && - !oplogApplicationEnforcesSteadyStateConstraints && - oplogApplicationMode == OplogApplication::Mode::kSecondary) { - if (opCounters) { - const auto& opObj = redact(op->toBSONForLogging()); - opCounters->gotDeleteFromMissingNamespace(); - logOplogConstraintViolation( - opCtx, - op->getNss(), - OplogConstraintViolationEnum::kDeleteOnMissingNs, - "delete", - opObj, - boost::none /* status */); - } - return Status::OK(); + invariant(coll); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() + << "missing database (" << nss.dbName().toStringForErrorMsg() << ")", + db); + OldClientContext ctx(opCtx, coll->nss(), db); + + // We convert updates to upserts in secondary mode when the + // oplogApplicationEnforcesSteadyStateConstraints parameter is false, to avoid + // failing on the constraint that updates in steady state mode always update + // an existing document. + // + // In initial sync and recovery modes we always ignore errors about missing + // documents on update, so there is no reason to convert the updates to upsert. + + bool shouldAlwaysUpsert = !oplogApplicationEnforcesSteadyStateConstraints && + oplogApplicationMode == OplogApplication::Mode::kSecondary; + Status status = applyOperation_inlock(opCtx, + *coll, + entryOrGroupedInserts, + shouldAlwaysUpsert, + oplogApplicationMode, + isDataConsistent, + incrementOpsAppliedStats); + if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { + throwWriteConflictException(str::stream() + << "WriteConflict caught when applying operation." + << " Original error: " << status.reason()); + } + return status; + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { + // This can happen in initial sync or recovery modes (when a delete of the + // namespace appears later in the oplog), but we will ignore it in the caller. + // + // When we're not enforcing steady-state constraints, the error is ignored + // only for deletes, on the grounds that deleting from a non-existent collection + // is a no-op. + if (opType == OpTypeEnum::kDelete && + !oplogApplicationEnforcesSteadyStateConstraints && + oplogApplicationMode == OplogApplication::Mode::kSecondary) { + if (opCounters) { + const auto& opObj = redact(op->toBSONForLogging()); + opCounters->gotDeleteFromMissingNamespace(); + logOplogConstraintViolation( + opCtx, + op->getNss(), + OplogConstraintViolationEnum::kDeleteOnMissingNs, + "delete", + opObj, + boost::none /* status */); } - - ex.addContext(str::stream() << "Failed to apply operation: " - << redact(entryOrGroupedInserts.toBSON())); - throw; + return Status::OK(); } - }); + + ex.addContext(str::stream() << "Failed to apply operation: " + << redact(entryOrGroupedInserts.toBSON())); + throw; + } + }); return status; } else if (opType == OpTypeEnum::kCommand) { auto status = - writeConflictRetry(opCtx, "applyOplogEntryOrGroupedInserts_command", nss.ns(), [&] { + writeConflictRetry(opCtx, "applyOplogEntryOrGroupedInserts_command", nss, [&] { // A special case apply for commands to avoid implicit database creation. Status status = applyCommand_inlock(opCtx, op, oplogApplicationMode); incrementOpsAppliedStats(); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index cf24bb49a4c..2c0dd7b7e33 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -66,7 +66,7 @@ NamespaceString kInitialSyncIdNss = * Returns min valid document. */ BSONObj getMinValidDocument(OperationContext* opCtx, const NamespaceString& minValidNss) { - return writeConflictRetry(opCtx, "getMinValidDocument", minValidNss.ns(), [opCtx, minValidNss] { + return writeConflictRetry(opCtx, "getMinValidDocument", minValidNss, [opCtx, minValidNss] { Lock::DBLock dblk(opCtx, minValidNss.dbName(), MODE_IS); Lock::CollectionLock lk(opCtx, minValidNss, MODE_IS); BSONObj mv; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1863add879c..c91b38e4847 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -420,7 +420,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati writeConflictRetry(opCtx, "initiate oplog entry", - NamespaceString::kRsOplogNamespace.toString(), + NamespaceString::kRsOplogNamespace, [this, &opCtx, &config] { // Permit writing to the oplog before we step up to primary. AllowNonLocalWritesBlock allowNonLocalWrites(opCtx); @@ -506,15 +506,16 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx); LOGV2(6015309, "Logging transition to primary to oplog on stepup"); - writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] { - AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - WriteUnitOfWork wuow(opCtx); - opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( - opCtx, - BSON(ReplicationCoordinator::newPrimaryMsgField - << ReplicationCoordinator::newPrimaryMsg)); - wuow.commit(); - }); + writeConflictRetry( + opCtx, "logging transition to primary to oplog", NamespaceString::kRsOplogNamespace, [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + opCtx, + BSON(ReplicationCoordinator::newPrimaryMsgField + << ReplicationCoordinator::newPrimaryMsg)); + wuow.commit(); + }); const auto loadLastOpTimeAndWallTimeResult = loadLastOpTimeAndWallTime(opCtx); fassert(28665, loadLastOpTimeAndWallTimeResult); auto opTimeToReturn = loadLastOpTimeAndWallTimeResult.getValue().opTime; @@ -571,10 +572,7 @@ StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocu OperationContext* opCtx) { try { return writeConflictRetry( - opCtx, - "load replica set config", - NamespaceString::kSystemReplSetNamespace.ns(), - [opCtx] { + opCtx, "load replica set config", NamespaceString::kSystemReplSetNamespace, [opCtx] { BSONObj config; if (!Helpers::getSingleton( opCtx, NamespaceString::kSystemReplSetNamespace, config)) { @@ -595,7 +593,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(Operati bool writeOplog) { try { writeConflictRetry( - opCtx, "save replica set config", NamespaceString::kSystemReplSetNamespace.ns(), [&] { + opCtx, "save replica set config", NamespaceString::kSystemReplSetNamespace, [&] { { // Writes to 'local.system.replset' must be untimestamped. WriteUnitOfWork wuow(opCtx); @@ -634,7 +632,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(Operati Status ReplicationCoordinatorExternalStateImpl::replaceLocalConfigDocument( OperationContext* opCtx, const BSONObj& config) try { writeConflictRetry( - opCtx, "replace replica set config", NamespaceString::kSystemReplSetNamespace.ns(), [&] { + opCtx, "replace replica set config", NamespaceString::kSystemReplSetNamespace, [&] { WriteUnitOfWork wuow(opCtx); auto coll = acquireCollection(opCtx, @@ -668,7 +666,7 @@ Status ReplicationCoordinatorExternalStateImpl::createLocalLastVoteCollection( try { writeConflictRetry(opCtx, "create initial replica set lastVote", - NamespaceString::kLastVoteNamespace.toString(), + NamespaceString::kLastVoteNamespace, [opCtx] { auto coll = acquireCollection( opCtx, @@ -697,10 +695,7 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteD OperationContext* opCtx) { try { return writeConflictRetry( - opCtx, - "load replica set lastVote", - NamespaceString::kLastVoteNamespace.toString(), - [opCtx] { + opCtx, "load replica set lastVote", NamespaceString::kLastVoteNamespace, [opCtx] { BSONObj lastVoteObj; if (!Helpers::getSingleton( opCtx, NamespaceString::kLastVoteNamespace, lastVoteObj)) { @@ -740,10 +735,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( noInterrupt.emplace(opCtx->lockState()); Status status = writeConflictRetry( - opCtx, - "save replica set lastVote", - NamespaceString::kLastVoteNamespace.toString(), - [&] { + opCtx, "save replica set lastVote", NamespaceString::kLastVoteNamespace, [&] { // Writes to non-replicated collections do not need concurrency control with the // OplogApplier that never accesses them. Skip taking the PBWM. ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( @@ -816,10 +808,9 @@ StatusWith<OpTimeAndWallTime> ReplicationCoordinatorExternalStateImpl::loadLastO BSONObj oplogEntry; - if (!writeConflictRetry( - opCtx, "Load last opTime", NamespaceString::kRsOplogNamespace.ns(), [&] { - return Helpers::getLast(opCtx, NamespaceString::kRsOplogNamespace, oplogEntry); - })) { + if (!writeConflictRetry(opCtx, "Load last opTime", NamespaceString::kRsOplogNamespace, [&] { + return Helpers::getLast(opCtx, NamespaceString::kRsOplogNamespace, oplogEntry); + })) { return StatusWith<OpTimeAndWallTime>( ErrorCodes::NoMatchingDocument, str::stream() << "Did not find any entries in " diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 701f1de097d..d87cec5f430 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -529,7 +529,7 @@ void RollbackImpl::_restoreTxnsTableEntryFromRetryableWrites(OperationContext* o sessionTxnRecord.setLastWriteDate(wallClockTime); } const auto nss = NamespaceString::kSessionTransactionsTableNamespace; - writeConflictRetry(opCtx, "updateSessionTransactionsTableInRollback", nss.ns(), [&] { + writeConflictRetry(opCtx, "updateSessionTransactionsTableInRollback", nss, [&] { opCtx->recoveryUnit()->allowOneUntimestampedWrite(); auto collection = acquireCollection(opCtx, diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 3a398b1b41e..c5bde73e29c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1704,10 +1704,7 @@ void syncFixUp(OperationContext* opCtx, if (!loc.isNull()) { try { writeConflictRetry( - opCtx, - "cappedTruncateAfter", - collection.nss().ns(), - [&] { + opCtx, "cappedTruncateAfter", collection.nss(), [&] { collection_internal::cappedTruncateAfter( opCtx, collection.getCollectionPtr(), @@ -1719,7 +1716,7 @@ void syncFixUp(OperationContext* opCtx, // hack: need to just make cappedTruncate do this... CollectionWriter collectionWriter(opCtx, &collection); writeConflictRetry( - opCtx, "truncate", collection.nss().ns(), [&] { + opCtx, "truncate", collection.nss(), [&] { WriteUnitOfWork wunit(opCtx); uassertStatusOK( collectionWriter diff --git a/src/mongo/db/repl/shard_merge_recipient_service.cpp b/src/mongo/db/repl/shard_merge_recipient_service.cpp index 1cb67e16f91..a070989a745 100644 --- a/src/mongo/db/repl/shard_merge_recipient_service.cpp +++ b/src/mongo/db/repl/shard_merge_recipient_service.cpp @@ -1336,7 +1336,7 @@ void ShardMergeRecipientService::Instance::_processCommittedTransactionEntry(con AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); writeConflictRetry( - opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace, [&] { WriteUnitOfWork wuow(opCtx); // Write the no-op entry and update 'config.transactions'. @@ -1790,7 +1790,7 @@ ShardMergeRecipientService::Instance::_advanceMajorityCommitTsToBkpCursorCheckpo writeConflictRetry(opCtx, "mergeRecipientWriteNoopToAdvanceStableTimestamp", - NamespaceString::kRsOplogNamespace.ns(), + NamespaceString::kRsOplogNamespace, [&] { if (token.isCanceled()) { return; @@ -2030,7 +2030,7 @@ void ShardMergeRecipientService::Instance::_writeStateDoc( str::stream() << nss.toStringForErrorMsg() << " does not exist", collection.exists()); - writeConflictRetry(opCtx, "writeShardMergeRecipientStateDoc", nss.ns(), [&]() { + writeConflictRetry(opCtx, "writeShardMergeRecipientStateDoc", nss, [&]() { WriteUnitOfWork wunit(opCtx); if (registerChange) @@ -2278,7 +2278,7 @@ SemiFuture<void> ShardMergeRecipientService::Instance::_durablyPersistCommitAbor str::stream() << nss.toStringForErrorMsg() << " does not exist", collection); - writeConflictRetry(opCtx, "markShardMergeStateDocGarbageCollectable", nss.ns(), [&]() { + writeConflictRetry(opCtx, "markShardMergeStateDocGarbageCollectable", nss, [&]() { WriteUnitOfWork wuow(opCtx); auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; const auto originalRecordId = diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 9b35e5ebbbe..2bc439a80d6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -239,7 +239,7 @@ StorageInterfaceImpl::createCollectionForBulkLoading( std::unique_ptr<CollectionBulkLoader> loader; // Retry if WCE. - Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss.ns(), [&] { + Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss, [&] { UnreplicatedWritesBlock uwb(opCtx.get()); // Get locks and create the collection. @@ -405,7 +405,7 @@ Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* opCtx) { hasLocalDatabase = true; continue; } - writeConflictRetry(opCtx, "dropReplicatedDatabases", toStringForLogging(dbName), [&] { + writeConflictRetry(opCtx, "dropReplicatedDatabases", NamespaceString(dbName), [&] { if (auto db = databaseHolder->getDb(opCtx, dbName)) { WriteUnitOfWork wuow(opCtx); databaseHolder->dropDb(opCtx, db); @@ -457,7 +457,7 @@ Status StorageInterfaceImpl::createCollection(OperationContext* opCtx, const bool createIdIndex, const BSONObj& idIndexSpec) { try { - return writeConflictRetry(opCtx, "StorageInterfaceImpl::createCollection", nss.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::createCollection", nss, [&] { AutoGetDb databaseWriteGuard(opCtx, nss.dbName(), MODE_IX); auto db = databaseWriteGuard.ensureDbExists(opCtx); invariant(db); @@ -495,20 +495,19 @@ Status StorageInterfaceImpl::createIndexesOnEmptyCollection( return Status::OK(); try { - writeConflictRetry( - opCtx, "StorageInterfaceImpl::createIndexesOnEmptyCollection", nss.ns(), [&] { - AutoGetCollection autoColl( - opCtx, nss, fixLockModeForSystemDotViewsChanges(nss, MODE_X)); - CollectionWriter collection(opCtx, nss); - - WriteUnitOfWork wunit(opCtx); - // Use IndexBuildsCoordinator::createIndexesOnEmptyCollection() rather than - // IndexCatalog::createIndexOnEmptyCollection() as the former generates - // 'createIndexes' oplog entry for replicated writes. - IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( - opCtx, collection, secondaryIndexSpecs, false /* fromMigrate */); - wunit.commit(); - }); + writeConflictRetry(opCtx, "StorageInterfaceImpl::createIndexesOnEmptyCollection", nss, [&] { + AutoGetCollection autoColl( + opCtx, nss, fixLockModeForSystemDotViewsChanges(nss, MODE_X)); + CollectionWriter collection(opCtx, nss); + + WriteUnitOfWork wunit(opCtx); + // Use IndexBuildsCoordinator::createIndexesOnEmptyCollection() rather than + // IndexCatalog::createIndexOnEmptyCollection() as the former generates + // 'createIndexes' oplog entry for replicated writes. + IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( + opCtx, collection, secondaryIndexSpecs, false /* fromMigrate */); + wunit.commit(); + }); } catch (DBException& ex) { return ex.toStatus(); } @@ -518,7 +517,7 @@ Status StorageInterfaceImpl::createIndexesOnEmptyCollection( Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { try { - return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss, [&] { AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_X); if (!autoDb.getDb()) { @@ -540,7 +539,7 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const Names Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx, const NamespaceString& nss) { - return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss, [&] { AutoGetCollection autoColl(opCtx, nss, MODE_X); auto collectionResult = getCollection(autoColl, nss, "The collection must exist before truncating."); @@ -569,7 +568,7 @@ Status StorageInterfaceImpl::renameCollection(OperationContext* opCtx, << "; to NS: " << toNS.toStringForErrorMsg()); } - return writeConflictRetry(opCtx, "StorageInterfaceImpl::renameCollection", fromNS.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::renameCollection", fromNS, [&] { AutoGetDb autoDB(opCtx, fromNS.dbName(), MODE_X); if (!autoDB.getDb()) { return Status(ErrorCodes::NamespaceNotFound, @@ -602,7 +601,7 @@ Status StorageInterfaceImpl::setIndexIsMultikey(OperationContext* opCtx, << " (" << collectionUUID << ") as multikey at null timestamp"); } - return writeConflictRetry(opCtx, "StorageInterfaceImpl::setIndexIsMultikey", nss.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::setIndexIsMultikey", nss, [&] { const NamespaceStringOrUUID nsOrUUID(nss.dbName(), collectionUUID); boost::optional<AutoGetCollection> autoColl; try { @@ -670,186 +669,180 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments( auto isFind = mode == FindDeleteMode::kFind; auto opStr = isFind ? "StorageInterfaceImpl::find" : "StorageInterfaceImpl::delete"; - return writeConflictRetry( - opCtx, opStr, nsOrUUID.toString(), [&]() -> StatusWith<std::vector<BSONObj>> { - // We need to explicitly use this in a few places to help the type inference. Use a - // shorthand. - using Result = StatusWith<std::vector<BSONObj>>; - - auto collectionAccessMode = isFind ? MODE_IS : MODE_IX; - const auto collection = - acquireCollection(opCtx, - CollectionAcquisitionRequest::fromOpCtx( - opCtx, nsOrUUID, AcquisitionPrerequisites::kWrite), - collectionAccessMode); - if (!collection.exists()) { - return Status{ErrorCodes::NamespaceNotFound, - str::stream() - << "Collection [" << nsOrUUID.toString() << "] not found. " - << "Unable to proceed with " << opStr << "."}; - } + return writeConflictRetry(opCtx, opStr, nsOrUUID, [&]() -> StatusWith<std::vector<BSONObj>> { + // We need to explicitly use this in a few places to help the type inference. Use a + // shorthand. + using Result = StatusWith<std::vector<BSONObj>>; - auto isForward = scanDirection == StorageInterface::ScanDirection::kForward; - auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD; + auto collectionAccessMode = isFind ? MODE_IS : MODE_IX; + const auto collection = + acquireCollection(opCtx, + CollectionAcquisitionRequest::fromOpCtx( + opCtx, nsOrUUID, AcquisitionPrerequisites::kWrite), + collectionAccessMode); + if (!collection.exists()) { + return Status{ErrorCodes::NamespaceNotFound, + str::stream() << "Collection [" << nsOrUUID.toString() << "] not found. " + << "Unable to proceed with " << opStr << "."}; + } - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor; - if (!indexName) { - if (!startKey.isEmpty()) { - return Result(ErrorCodes::NoSuchKey, - "non-empty startKey not allowed for collection scan"); - } - if (boundInclusion != BoundInclusion::kIncludeStartKeyOnly) { - return Result( - ErrorCodes::InvalidOptions, - "bound inclusion must be BoundInclusion::kIncludeStartKeyOnly for " - "collection scan"); - } - // Use collection scan. - planExecutor = isFind - ? InternalPlanner::collectionScan(opCtx, - &collection.getCollectionPtr(), - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction) - : InternalPlanner::deleteWithCollectionScan( - opCtx, - collection, - makeDeleteStageParamsForDeleteDocuments(), - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction); - } else if (*indexName == kIdIndexName && collection.getCollectionPtr()->isClustered() && - collection.getCollectionPtr() - ->getClusteredInfo() - ->getIndexSpec() - .getKey() - .firstElement() - .fieldNameStringData() == "_id") { - - auto collScanBoundInclusion = [boundInclusion]() { - switch (boundInclusion) { - case BoundInclusion::kExcludeBothStartAndEndKeys: - return CollectionScanParams::ScanBoundInclusion:: - kExcludeBothStartAndEndRecords; - case BoundInclusion::kIncludeStartKeyOnly: - return CollectionScanParams::ScanBoundInclusion:: - kIncludeStartRecordOnly; - case BoundInclusion::kIncludeEndKeyOnly: - return CollectionScanParams::ScanBoundInclusion::kIncludeEndRecordOnly; - case BoundInclusion::kIncludeBothStartAndEndKeys: - return CollectionScanParams::ScanBoundInclusion:: - kIncludeBothStartAndEndRecords; - default: - MONGO_UNREACHABLE; - } - }(); - - boost::optional<RecordIdBound> minRecord, maxRecord; - if (direction == InternalPlanner::FORWARD) { - if (!startKey.isEmpty()) { - minRecord = RecordIdBound(record_id_helpers::keyForObj(startKey)); - } - if (!endKey.isEmpty()) { - maxRecord = RecordIdBound(record_id_helpers::keyForObj(endKey)); - } - } else { - if (!startKey.isEmpty()) { - maxRecord = RecordIdBound(record_id_helpers::keyForObj(startKey)); - } - if (!endKey.isEmpty()) { - minRecord = RecordIdBound(record_id_helpers::keyForObj(endKey)); - } + auto isForward = scanDirection == StorageInterface::ScanDirection::kForward; + auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD; + + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> planExecutor; + if (!indexName) { + if (!startKey.isEmpty()) { + return Result(ErrorCodes::NoSuchKey, + "non-empty startKey not allowed for collection scan"); + } + if (boundInclusion != BoundInclusion::kIncludeStartKeyOnly) { + return Result(ErrorCodes::InvalidOptions, + "bound inclusion must be BoundInclusion::kIncludeStartKeyOnly for " + "collection scan"); + } + // Use collection scan. + planExecutor = isFind + ? InternalPlanner::collectionScan(opCtx, + &collection.getCollectionPtr(), + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction) + : InternalPlanner::deleteWithCollectionScan( + opCtx, + collection, + makeDeleteStageParamsForDeleteDocuments(), + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction); + } else if (*indexName == kIdIndexName && collection.getCollectionPtr()->isClustered() && + collection.getCollectionPtr() + ->getClusteredInfo() + ->getIndexSpec() + .getKey() + .firstElement() + .fieldNameStringData() == "_id") { + + auto collScanBoundInclusion = [boundInclusion]() { + switch (boundInclusion) { + case BoundInclusion::kExcludeBothStartAndEndKeys: + return CollectionScanParams::ScanBoundInclusion:: + kExcludeBothStartAndEndRecords; + case BoundInclusion::kIncludeStartKeyOnly: + return CollectionScanParams::ScanBoundInclusion::kIncludeStartRecordOnly; + case BoundInclusion::kIncludeEndKeyOnly: + return CollectionScanParams::ScanBoundInclusion::kIncludeEndRecordOnly; + case BoundInclusion::kIncludeBothStartAndEndKeys: + return CollectionScanParams::ScanBoundInclusion:: + kIncludeBothStartAndEndRecords; + default: + MONGO_UNREACHABLE; } + }(); - planExecutor = isFind - ? InternalPlanner::collectionScan(opCtx, - &collection.getCollectionPtr(), - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction, - boost::none /* resumeAfterId */, - minRecord, - maxRecord, - collScanBoundInclusion) - : InternalPlanner::deleteWithCollectionScan( - opCtx, - collection, - makeDeleteStageParamsForDeleteDocuments(), - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction, - minRecord, - maxRecord, - collScanBoundInclusion); - } else { - // Use index scan. - auto indexCatalog = collection.getCollectionPtr()->getIndexCatalog(); - invariant(indexCatalog); - const IndexDescriptor* indexDescriptor = indexCatalog->findIndexByName( - opCtx, *indexName, IndexCatalog::InclusionPolicy::kReady); - if (!indexDescriptor) { - return Result(ErrorCodes::IndexNotFound, - str::stream() - << "Index not found, ns:" << nsOrUUID.toStringForErrorMsg() - << ", index: " << *indexName); + boost::optional<RecordIdBound> minRecord, maxRecord; + if (direction == InternalPlanner::FORWARD) { + if (!startKey.isEmpty()) { + minRecord = RecordIdBound(record_id_helpers::keyForObj(startKey)); } - if (indexDescriptor->isPartial()) { - return Result(ErrorCodes::IndexOptionsConflict, - str::stream() - << "Partial index is not allowed for this operation, ns:" - << nsOrUUID.toStringForErrorMsg() - << ", index: " << *indexName); + if (!endKey.isEmpty()) { + maxRecord = RecordIdBound(record_id_helpers::keyForObj(endKey)); } - - KeyPattern keyPattern(indexDescriptor->keyPattern()); - auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false)); - auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true)); - auto bounds = - isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey); + } else { if (!startKey.isEmpty()) { - bounds.first = startKey; + maxRecord = RecordIdBound(record_id_helpers::keyForObj(startKey)); } if (!endKey.isEmpty()) { - bounds.second = endKey; + minRecord = RecordIdBound(record_id_helpers::keyForObj(endKey)); } - planExecutor = isFind - ? InternalPlanner::indexScan(opCtx, - &collection.getCollectionPtr(), - indexDescriptor, - bounds.first, - bounds.second, - boundInclusion, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction, - InternalPlanner::IXSCAN_FETCH) - : InternalPlanner::deleteWithIndexScan( - opCtx, - collection, - makeDeleteStageParamsForDeleteDocuments(), - indexDescriptor, - bounds.first, - bounds.second, - boundInclusion, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - direction); } - std::vector<BSONObj> docs; + planExecutor = isFind + ? InternalPlanner::collectionScan(opCtx, + &collection.getCollectionPtr(), + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction, + boost::none /* resumeAfterId */, + minRecord, + maxRecord, + collScanBoundInclusion) + : InternalPlanner::deleteWithCollectionScan( + opCtx, + collection, + makeDeleteStageParamsForDeleteDocuments(), + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction, + minRecord, + maxRecord, + collScanBoundInclusion); + } else { + // Use index scan. + auto indexCatalog = collection.getCollectionPtr()->getIndexCatalog(); + invariant(indexCatalog); + const IndexDescriptor* indexDescriptor = indexCatalog->findIndexByName( + opCtx, *indexName, IndexCatalog::InclusionPolicy::kReady); + if (!indexDescriptor) { + return Result(ErrorCodes::IndexNotFound, + str::stream() + << "Index not found, ns:" << nsOrUUID.toStringForErrorMsg() + << ", index: " << *indexName); + } + if (indexDescriptor->isPartial()) { + return Result(ErrorCodes::IndexOptionsConflict, + str::stream() + << "Partial index is not allowed for this operation, ns:" + << nsOrUUID.toStringForErrorMsg() << ", index: " << *indexName); + } - try { - BSONObj out; - PlanExecutor::ExecState state = PlanExecutor::ExecState::ADVANCED; - while (state == PlanExecutor::ExecState::ADVANCED && docs.size() < limit) { - state = planExecutor->getNext(&out, nullptr); - if (state == PlanExecutor::ExecState::ADVANCED) { - docs.push_back(out.getOwned()); - } + KeyPattern keyPattern(indexDescriptor->keyPattern()); + auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false)); + auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true)); + auto bounds = + isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey); + if (!startKey.isEmpty()) { + bounds.first = startKey; + } + if (!endKey.isEmpty()) { + bounds.second = endKey; + } + planExecutor = isFind + ? InternalPlanner::indexScan(opCtx, + &collection.getCollectionPtr(), + indexDescriptor, + bounds.first, + bounds.second, + boundInclusion, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction, + InternalPlanner::IXSCAN_FETCH) + : InternalPlanner::deleteWithIndexScan(opCtx, + collection, + makeDeleteStageParamsForDeleteDocuments(), + indexDescriptor, + bounds.first, + bounds.second, + boundInclusion, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + direction); + } + + std::vector<BSONObj> docs; + + try { + BSONObj out; + PlanExecutor::ExecState state = PlanExecutor::ExecState::ADVANCED; + while (state == PlanExecutor::ExecState::ADVANCED && docs.size() < limit) { + state = planExecutor->getNext(&out, nullptr); + if (state == PlanExecutor::ExecState::ADVANCED) { + docs.push_back(out.getOwned()); } - } catch (const WriteConflictException&) { - // Re-throw the WCE, since it will get caught be a retry loop at a higher level. - throw; - } catch (const DBException&) { - return exceptionToStatus(); } + } catch (const WriteConflictException&) { + // Re-throw the WCE, since it will get caught be a retry loop at a higher level. + throw; + } catch (const DBException&) { + return exceptionToStatus(); + } - return Result{docs}; - }); + return Result{docs}; + }); } StatusWith<BSONObj> _findOrDeleteById(OperationContext* opCtx, @@ -984,7 +977,7 @@ Status _updateWithQuery(OperationContext* opCtx, invariant(PlanYieldPolicy::YieldPolicy::NO_YIELD == request.getYieldPolicy()); auto& nss = request.getNamespaceString(); - return writeConflictRetry(opCtx, "_updateWithQuery", nss.ns(), [&] { + return writeConflictRetry(opCtx, "_updateWithQuery", nss, [&] { const auto collection = acquireCollection( opCtx, CollectionAcquisitionRequest::fromOpCtx(opCtx, nss, AcquisitionPrerequisites::kWrite), @@ -1049,7 +1042,7 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx, } auto query = queryResult.getValue(); - return writeConflictRetry(opCtx, "StorageInterfaceImpl::upsertById", nsOrUUID.toString(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::upsertById", nsOrUUID, [&] { const auto collection = acquireCollection(opCtx, CollectionAcquisitionRequest::fromOpCtx( @@ -1151,7 +1144,7 @@ Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx, // disallow client deletes from unrecognized system collections. request.setGod(true); - return writeConflictRetry(opCtx, "StorageInterfaceImpl::deleteByFilter", nss.ns(), [&] { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::deleteByFilter", nss, [&] { const auto collection = acquireCollection( opCtx, CollectionAcquisitionRequest::fromOpCtx(opCtx, nss, AcquisitionPrerequisites::kWrite), diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index ac3b69e80f5..8837d1831bf 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -120,7 +120,7 @@ CollectionOptions createOplogCollectionOptions() { void createCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options = generateOptionsWithUuid()) { - writeConflictRetry(opCtx, "createCollection", nss.ns(), [&] { + writeConflictRetry(opCtx, "createCollection", nss, [&] { Lock::DBLock dblk(opCtx, nss.dbName(), MODE_X); OldClientContext ctx(opCtx, nss); auto db = ctx.db(); diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp index 76ee5586e2e..00da9d8e484 100644 --- a/src/mongo/db/repl/storage_timestamp_test.cpp +++ b/src/mongo/db/repl/storage_timestamp_test.cpp @@ -349,7 +349,7 @@ public: } void create(NamespaceString nss) const { - ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] { + ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss, [&] { _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); _opCtx->recoveryUnit()->abandonSnapshot(); AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X); diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 0592fb30dd5..145255958dc 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -637,7 +637,7 @@ void performNoopWrite(OperationContext* opCtx, StringData msg) { replCoord->canAcceptWritesForDatabase(opCtx, DatabaseName::kAdmin)); writeConflictRetry( - opCtx, "performNoopWrite", NamespaceString::kRsOplogNamespace.ns(), [&opCtx, &msg] { + opCtx, "performNoopWrite", NamespaceString::kRsOplogNamespace, [&opCtx, &msg] { WriteUnitOfWork wuow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( opCtx, BSON("msg" << msg)); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 997e920370b..723adb47d93 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -539,7 +539,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState MODE_IX); writeConflictRetry( - opCtx, "TenantMigrationDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] { + opCtx, "TenantMigrationDonorInsertStateDoc", _stateDocumentsNS, [&] { const auto filter = BSON(TenantMigrationDonorDocument::kIdFieldName << _migrationUuid); const auto updateMod = [&]() { @@ -594,7 +594,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState collection); writeConflictRetry( - opCtx, "TenantMigrationDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] { + opCtx, "TenantMigrationDonorUpdateStateDoc", _stateDocumentsNS, [&] { WriteUnitOfWork wuow(opCtx); const auto originalRecordId = Helpers::findOne( @@ -718,7 +718,7 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( writeConflictRetry( opCtx, "TenantMigrationDonorMarkStateDocAsGarbageCollectable", - _stateDocumentsNS.ns(), + _stateDocumentsNS, [&] { const auto filter = BSON(TenantMigrationDonorDocument::kIdFieldName << _migrationUuid); diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp index 2ab83a7b44b..81d1d096465 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp @@ -71,7 +71,7 @@ Status insertStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); return writeConflictRetry( - opCtx, "insertTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status { + opCtx, "insertTenantMigrationRecipientStateDoc", nss, [&]() -> Status { // Insert the 'stateDoc' if no active tenant migration found for the 'tenantId' provided // in the 'stateDoc'. Tenant Migration is considered as active for a tenantId if a state // document exists on the disk for that 'tenantId' and not marked to be garbage @@ -113,7 +113,7 @@ Status updateStateDoc(OperationContext* opCtx, const TenantMigrationRecipientDoc } return writeConflictRetry( - opCtx, "updateTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> Status { + opCtx, "updateTenantMigrationRecipientStateDoc", nss, [&]() -> Status { auto updateResult = Helpers::upsert(opCtx, collection, stateDoc.toBSON(), /*fromMigrate=*/false); if (updateResult.numMatched == 0) { @@ -145,11 +145,10 @@ StatusWith<bool> deleteStateDocIfMarkedAsGarbageCollectable(OperationContext* op auto query = BSON(TenantMigrationRecipientDocument::kTenantIdFieldName << tenantId << TenantMigrationRecipientDocument::kExpireAtFieldName << BSON("$exists" << 1)); - return writeConflictRetry( - opCtx, "deleteTenantMigrationRecipientStateDoc", nss.ns(), [&]() -> bool { - auto nDeleted = deleteObjects(opCtx, collection, query, true /* justOne */); - return nDeleted > 0; - }); + return writeConflictRetry(opCtx, "deleteTenantMigrationRecipientStateDoc", nss, [&]() -> bool { + auto nDeleted = deleteObjects(opCtx, collection, query, true /* justOne */); + return nDeleted > 0; + }); } StatusWith<TenantMigrationRecipientDocument> getStateDoc(OperationContext* opCtx, diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 1ac857ed1f4..39551cbe24e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -1096,7 +1096,7 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); writeConflictRetry( - opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace, [&] { WriteUnitOfWork wuow(opCtx); // Write the no-op entry and update 'config.transactions'. @@ -1822,7 +1822,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_markStateDocAsGarba writeConflictRetry( opCtx, "markTenantMigrationRecipientStateDocGarbageCollectable", - NamespaceString::kTenantMigrationRecipientsNamespace.ns(), + NamespaceString::kTenantMigrationRecipientsNamespace, [&]() { WriteUnitOfWork wuow(opCtx); auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index 3c9585415c0..3be275c7b44 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -234,7 +234,7 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, // Import the collection and it's indexes. const auto nss = metadata.ns; - writeConflictRetry(opCtx, "importCollection", nss.ns(), [&] { + writeConflictRetry(opCtx, "importCollection", nss, [&] { LOGV2_DEBUG(6114303, 1, "Importing donor collection", "ns"_attr = nss); AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IX); auto db = autoDb.ensureDbExists(opCtx); diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 511a16d3405..b97faca5845 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -138,7 +138,7 @@ repl::OpTime storeExternalClusterTimeKeyDocs(std::vector<ExternalKeysCollectionD AcquisitionPrerequisites::kWrite), MODE_IX); - writeConflictRetry(opCtx, "CloneExternalKeyDocs", nss.ns(), [&] { + writeConflictRetry(opCtx, "CloneExternalKeyDocs", nss, [&] { // Note that each external key's _id is generated by the migration, so this upsert can // only insert. const auto filter = @@ -158,7 +158,7 @@ repl::OpTime storeExternalClusterTimeKeyDocs(std::vector<ExternalKeysCollectionD void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db) { writeConflictRetry( - opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView.ns(), [&] { + opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView, [&] { { // Create 'system.views' in a separate WUOW if it does not exist. WriteUnitOfWork wuow(opCtx); @@ -619,7 +619,7 @@ ExecutorFuture<void> markExternalKeysAsGarbageCollectable( MODE_IX); writeConflictRetry( - opCtx, "TenantMigrationMarkExternalKeysAsGarbageCollectable", nss.ns(), [&] { + opCtx, "TenantMigrationMarkExternalKeysAsGarbageCollectable", nss, [&] { auto request = UpdateRequest(); request.setNamespaceString(nss); request.setQuery( diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 12da8edeb99..41b9c1a1738 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -884,7 +884,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( } writeConflictRetry( - opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace, [&] { WriteUnitOfWork wuow(opCtx.get()); // Write the pre/post image entry, if it exists. @@ -931,36 +931,35 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); auto tenantLocks = _acquireIntentExclusiveTenantLocks(opCtx.get(), begin, end); - writeConflictRetry( - opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] { - WriteUnitOfWork wuow(opCtx.get()); - for (auto iter = begin; iter != end; iter++) { - const auto& entry = *iter->first; - if (isResumeTokenNoop(entry)) { - // We don't want to write noops for resume token noop oplog entries. They would - // not be applied in a change stream anyways. - continue; - } - // We don't need to link no-ops entries for operations done outside of a session. - const boost::optional<OpTime> preImageOpTime = boost::none; - const boost::optional<OpTime> postImageOpTime = boost::none; - const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none; - opObserver->onInternalOpMessage( - opCtx.get(), - entry.getNss(), - entry.getUuid(), - {}, // Empty 'o' field. - entry.getEntry().toBSON(), - // We link the no-ops together by recipient op time the same way the actual ops - // were linked together by donor op time. This is to allow retryable writes - // and changestreams to find the ops they need. - preImageOpTime, - postImageOpTime, - prevWriteOpTimeInTransaction, - *iter->second); + writeConflictRetry(opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace, [&] { + WriteUnitOfWork wuow(opCtx.get()); + for (auto iter = begin; iter != end; iter++) { + const auto& entry = *iter->first; + if (isResumeTokenNoop(entry)) { + // We don't want to write noops for resume token noop oplog entries. They would + // not be applied in a change stream anyways. + continue; } - wuow.commit(); - }); + // We don't need to link no-ops entries for operations done outside of a session. + const boost::optional<OpTime> preImageOpTime = boost::none; + const boost::optional<OpTime> postImageOpTime = boost::none; + const boost::optional<OpTime> prevWriteOpTimeInTransaction = boost::none; + opObserver->onInternalOpMessage( + opCtx.get(), + entry.getNss(), + entry.getUuid(), + {}, // Empty 'o' field. + entry.getEntry().toBSON(), + // We link the no-ops together by recipient op time the same way the actual ops + // were linked together by donor op time. This is to allow retryable writes + // and changestreams to find the ops they need. + preImageOpTime, + postImageOpTime, + prevWriteOpTimeInTransaction, + *iter->second); + } + wuow.commit(); + }); } std::vector<Lock::TenantLock> TenantOplogApplier::_acquireIntentExclusiveTenantLocks( OperationContext* opCtx, diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 4fef613e8e7..77630c0026f 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -209,7 +209,7 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx, const auto dbName = entry.getNss().dbName(); Status status = Status::OK(); - writeConflictRetry(opCtx, "replaying prepared transaction", dbName.db(), [&] { + writeConflictRetry(opCtx, "replaying prepared transaction", NamespaceString(dbName), [&] { WriteUnitOfWork wunit(opCtx); // We might replay a prepared transaction behind oldest timestamp. @@ -570,7 +570,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx, opCtx->resetMultiDocumentTransactionState(); }); - return writeConflictRetry(opCtx, "applying prepare transaction", prepareOp.getNss().ns(), [&] { + return writeConflictRetry(opCtx, "applying prepare transaction", prepareOp.getNss(), [&] { // The write on transaction table may be applied concurrently, so refreshing // state from disk may read that write, causing starting a new transaction // on an existing txnNumber. Thus, we start a new transaction without diff --git a/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp index 7b0dbc44d55..d56db1e6ea4 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_config_initialization_test.cpp @@ -248,7 +248,7 @@ TEST_F(ConfigInitializationTest, ReRunsIfDocRolledBackThenReElected) { auto opCtx = operationContext(); repl::UnreplicatedWritesBlock uwb(opCtx); auto nss = VersionType::ConfigNS; - writeConflictRetry(opCtx, "removeConfigDocuments", nss.ns(), [&] { + writeConflictRetry(opCtx, "removeConfigDocuments", nss, [&] { AutoGetCollection coll(opCtx, nss, MODE_IX); ASSERT_TRUE(coll); auto cursor = coll->getCursor(opCtx); diff --git a/src/mongo/db/s/global_index/global_index_cloning_service.cpp b/src/mongo/db/s/global_index/global_index_cloning_service.cpp index 78d4ffc27db..245df6bbd33 100644 --- a/src/mongo/db/s/global_index/global_index_cloning_service.cpp +++ b/src/mongo/db/s/global_index/global_index_cloning_service.cpp @@ -471,7 +471,7 @@ void GlobalIndexCloningService::CloningStateMachine::_ensureCollection(Operation invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Create the destination collection if necessary. - writeConflictRetry(opCtx, "CloningStateMachine::_ensureCollection", nss.toString(), [&] { + writeConflictRetry(opCtx, "CloningStateMachine::_ensureCollection", nss, [&] { const Collection* coll = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); if (coll) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 12d83438f0c..b623db6b16d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -302,7 +302,7 @@ public: writeConflictRetry( opCtx, "Fetching session related oplogs for migration", - NamespaceString::kRsOplogNamespace.ns(), + NamespaceString::kRsOplogNamespace, [&]() { AutoGetActiveCloner autoCloner(opCtx, migrationSessionId, false); opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 179910677b7..d430bbddf42 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -1711,7 +1711,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, const uassertStatusOK(rs->goingToDelete(fullObj)); } - writeConflictRetry(opCtx, "transferModsDeletes", _nss.ns(), [&] { + writeConflictRetry(opCtx, "transferModsDeletes", _nss, [&] { deleteObjects(opCtx, collection, id, @@ -1769,7 +1769,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, const } // We are in write lock here, so sure we aren't killing - writeConflictRetry(opCtx, "transferModsUpdates", _nss.ns(), [&] { + writeConflictRetry(opCtx, "transferModsUpdates", _nss, [&] { auto res = Helpers::upsert(opCtx, collection, updatedDoc, true); if (!res.upsertedId.isEmpty()) { changeInOrphans++; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index e930e656923..214b79a1347 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -536,20 +536,19 @@ void notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx, // TODO (SERVER-71444): Fix to be interruptible or document exception. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT. AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - writeConflictRetry( - opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { - WriteUnitOfWork uow(opCtx); - serviceContext->getOpObserver()->onInternalOpMessage(opCtx, - collNss, - *collUUID, - BSON("msg" << dbgMessage), - o2Message, - boost::none, - boost::none, - boost::none, - boost::none); - uow.commit(); - }); + writeConflictRetry(opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace, [&] { + WriteUnitOfWork uow(opCtx); + serviceContext->getOpObserver()->onInternalOpMessage(opCtx, + collNss, + *collUUID, + BSON("msg" << dbgMessage), + o2Message, + boost::none, + boost::none, + boost::none, + boost::none); + uow.commit(); + }); } void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx, @@ -570,20 +569,19 @@ void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx, // TODO (SERVER-71444): Fix to be interruptible or document exception. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT. AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - writeConflictRetry( - opCtx, "migrateLastChunkFromShard", NamespaceString::kRsOplogNamespace.ns(), [&] { - WriteUnitOfWork uow(opCtx); - serviceContext->getOpObserver()->onInternalOpMessage(opCtx, - collNss, - *collUUID, - BSON("msg" << oMessage), - o2Message, - boost::none, - boost::none, - boost::none, - boost::none); - uow.commit(); - }); + writeConflictRetry(opCtx, "migrateLastChunkFromShard", NamespaceString::kRsOplogNamespace, [&] { + WriteUnitOfWork uow(opCtx); + serviceContext->getOpObserver()->onInternalOpMessage(opCtx, + collNss, + *collUUID, + BSON("msg" << oMessage), + o2Message, + boost::none, + boost::none, + boost::none, + boost::none); + uow.commit(); + }); } void persistCommitDecision(OperationContext* opCtx, diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index b629d67f280..f403cf4b89c 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -422,7 +422,7 @@ void persistUpdatedNumOrphans(OperationContext* opCtx, // The DBDirectClient will not retry WriteConflictExceptions internally while holding an X // mode lock, so we need to retry at this level. writeConflictRetry( - opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace.ns(), [&] { + opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace, [&] { store.update(opCtx, query, BSON("$inc" << BSON(RangeDeletionTask::kNumOrphanDocsFieldName diff --git a/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp b/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp index 0da65b240fa..ad6ea0b6b20 100644 --- a/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp +++ b/src/mongo/db/s/refine_collection_shard_key_coordinator.cpp @@ -65,21 +65,20 @@ void notifyChangeStreamsOnRefineCollectionShardKeyComplete(OperationContext* opC auto const serviceContext = opCtx->getClient()->getServiceContext(); - writeConflictRetry( - opCtx, "RefineCollectionShardKey", NamespaceString::kRsOplogNamespace.ns(), [&] { - AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - WriteUnitOfWork uow(opCtx); - serviceContext->getOpObserver()->onInternalOpMessage(opCtx, - collNss, - collUUID, - BSON("msg" << oMessage), - cmdBuilder.obj(), - boost::none, - boost::none, - boost::none, - boost::none); - uow.commit(); - }); + writeConflictRetry(opCtx, "RefineCollectionShardKey", NamespaceString::kRsOplogNamespace, [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + WriteUnitOfWork uow(opCtx); + serviceContext->getOpObserver()->onInternalOpMessage(opCtx, + collNss, + collUUID, + BSON("msg" << oMessage), + cmdBuilder.obj(), + boost::none, + boost::none, + boost::none, + boost::none); + uow.commit(); + }); } } // namespace diff --git a/src/mongo/db/s/reshard_collection_coordinator.cpp b/src/mongo/db/s/reshard_collection_coordinator.cpp index df46663be3c..b2802d7483b 100644 --- a/src/mongo/db/s/reshard_collection_coordinator.cpp +++ b/src/mongo/db/s/reshard_collection_coordinator.cpp @@ -85,7 +85,7 @@ void notifyChangeStreamsOnReshardCollectionComplete(OperationContext* opCtx, const auto cmd = cmdBuilder.obj(); - writeConflictRetry(opCtx, "ReshardCollection", NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry(opCtx, "ReshardCollection", NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); WriteUnitOfWork uow(opCtx); serviceContext->getOpObserver()->onInternalOpMessage(opCtx, diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 715e90c6e43..62787fc90c7 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -60,7 +60,7 @@ void ensureCollectionExists(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - writeConflictRetry(opCtx, "resharding::data_copy::ensureCollectionExists", nss.toString(), [&] { + writeConflictRetry(opCtx, "resharding::data_copy::ensureCollectionExists", nss, [&] { AutoGetCollection coll(opCtx, nss, MODE_IX); if (coll) { return; @@ -78,20 +78,19 @@ void ensureCollectionDropped(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - writeConflictRetry( - opCtx, "resharding::data_copy::ensureCollectionDropped", nss.toString(), [&] { - AutoGetCollection coll(opCtx, nss, MODE_X); - if (!coll || (uuid && coll->uuid() != uuid)) { - // If the collection doesn't exist or exists with a different UUID, then the - // requested collection has been dropped already. - return; - } + writeConflictRetry(opCtx, "resharding::data_copy::ensureCollectionDropped", nss, [&] { + AutoGetCollection coll(opCtx, nss, MODE_X); + if (!coll || (uuid && coll->uuid() != uuid)) { + // If the collection doesn't exist or exists with a different UUID, then the + // requested collection has been dropped already. + return; + } - WriteUnitOfWork wuow(opCtx); - uassertStatusOK(coll.getDb()->dropCollectionEvenIfSystem( - opCtx, nss, {} /* dropOpTime */, true /* markFromMigrate */)); - wuow.commit(); - }); + WriteUnitOfWork wuow(opCtx); + uassertStatusOK(coll.getDb()->dropCollectionEvenIfSystem( + opCtx, nss, {} /* dropOpTime */, true /* markFromMigrate */)); + wuow.commit(); + }); } void ensureOplogCollectionsDropped(OperationContext* opCtx, @@ -237,7 +236,7 @@ std::vector<InsertStatement> fillBatchForInsert(Pipeline& pipeline, int batchSiz int insertBatch(OperationContext* opCtx, const NamespaceString& nss, std::vector<InsertStatement>& batch) { - return writeConflictRetry(opCtx, "resharding::data_copy::insertBatch", nss.ns(), [&] { + return writeConflictRetry(opCtx, "resharding::data_copy::insertBatch", nss, [&] { AutoGetCollection outputColl(opCtx, nss, MODE_IX); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "Collection '" << nss.toStringForErrorMsg() @@ -349,7 +348,7 @@ void updateSessionRecord(OperationContext* opCtx, writeConflictRetry( opCtx, "resharding::data_copy::updateSessionRecord", - NamespaceString::kSessionTransactionsTableNamespace.ns(), + NamespaceString::kSessionTransactionsTableNamespace, [&] { AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index e3a82f24d7f..d0a81bb656d 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -90,7 +90,7 @@ Date_t getCurrentTime() { Timestamp generateMinFetchTimestamp(OperationContext* opCtx, const NamespaceString& sourceNss) { // Do a no-op write and use the OpTime as the minFetchTimestamp writeConflictRetry( - opCtx, "resharding donor minFetchTimestamp", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "resharding donor minFetchTimestamp", NamespaceString::kRsOplogNamespace, [&] { AutoGetDb db(opCtx, sourceNss.dbName(), MODE_IX); Lock::CollectionLock collLock(opCtx, sourceNss, MODE_S); @@ -627,7 +627,7 @@ void ReshardingDonorService::DonorStateMachine:: auto oplog = generateOplogEntry(); writeConflictRetry( - rawOpCtx, "ReshardingBeginOplog", NamespaceString::kRsOplogNamespace.ns(), [&] { + rawOpCtx, "ReshardingBeginOplog", NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(rawOpCtx, OplogAccessMode::kWrite); WriteUnitOfWork wunit(rawOpCtx); const auto& oplogOpTime = repl::logOp(rawOpCtx, &oplog); @@ -744,7 +744,7 @@ void ReshardingDonorService::DonorStateMachine:: writeConflictRetry( rawOpCtx, "ReshardingBlockWritesOplog", - NamespaceString::kRsOplogNamespace.ns(), + NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(rawOpCtx, OplogAccessMode::kWrite); WriteUnitOfWork wunit(rawOpCtx); @@ -996,7 +996,7 @@ void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace; - writeConflictRetry(opCtx.get(), "DonorStateMachine::_updateDonorDocument", nss.toString(), [&] { + writeConflictRetry(opCtx.get(), "DonorStateMachine::_updateDonorDocument", nss, [&] { auto coll = acquireCollection( opCtx.get(), CollectionAcquisitionRequest(NamespaceString(nss), @@ -1028,7 +1028,7 @@ void ReshardingDonorService::DonorStateMachine::_removeDonorDocument( auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace; - writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss.toString(), [&] { + writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss, [&] { const auto coll = acquireCollection( opCtx.get(), CollectionAcquisitionRequest(NamespaceString(nss), diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 2708c104105..c036f77f078 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -146,7 +146,7 @@ Status ReshardingOplogApplicationRules::applyOperation( invariant(!opCtx->lockState()->inAWriteUnitOfWork()); invariant(opCtx->writesAreReplicated()); - return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss().ns(), [&] { + return writeConflictRetry(opCtx, "applyOplogEntryCRUDOpResharding", op.getNss(), [&] { try { auto opType = op.getOpType(); switch (opType) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index b1e91410aab..e2a4bf22db8 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -242,7 +242,7 @@ void ReshardingOplogFetcher::_ensureCollection(Client* client, invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Create the destination collection if necessary. - writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss.toString(), [&] { + writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", nss, [&] { const Collection* coll = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); if (coll) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index 096a9292ec0..47a38bec8f5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -213,7 +213,7 @@ public: } void create(NamespaceString nss) { - writeConflictRetry(_opCtx, "create", nss.ns(), [&] { + writeConflictRetry(_opCtx, "create", nss, [&] { AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(_opCtx->lockState()); AutoGetDb autoDb(_opCtx, nss.dbName(), LockMode::MODE_X); WriteUnitOfWork wunit(_opCtx); diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp index 8471b6586bf..e51e549bda5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp @@ -83,7 +83,7 @@ boost::optional<repl::OpTime> ReshardingOplogSessionApplication::_logPrePostImag return writeConflictRetry( opCtx, "ReshardingOplogSessionApplication::_logPrePostImage", - NamespaceString::kRsOplogNamespace.ns(), + NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index b67e2b28403..6328f3066fe 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -810,7 +810,7 @@ void ReshardingRecipientService::RecipientStateMachine::_writeStrictConsistencyO auto oplog = generateOplogEntry(); writeConflictRetry( - rawOpCtx, "ReshardDoneCatchUpOplog", NamespaceString::kRsOplogNamespace.ns(), [&] { + rawOpCtx, "ReshardDoneCatchUpOplog", NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(rawOpCtx, OplogAccessMode::kWrite); WriteUnitOfWork wunit(rawOpCtx); const auto& oplogOpTime = repl::logOp(rawOpCtx, &oplog); @@ -1120,36 +1120,34 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument auto opCtx = factory.makeOperationContext(&cc()); const auto& nss = NamespaceString::kRecipientReshardingOperationsNamespace; - writeConflictRetry( - opCtx.get(), "RecipientStateMachine::_removeRecipientDocument", nss.toString(), [&] { - const auto coll = - acquireCollection(opCtx.get(), - CollectionAcquisitionRequest( - NamespaceString(nss), - PlacementConcern{boost::none, ShardVersion::UNSHARDED()}, - repl::ReadConcernArgs::get(opCtx.get()), - AcquisitionPrerequisites::kWrite), - MODE_IX); - - if (!coll.exists()) { - return; - } - - WriteUnitOfWork wuow(opCtx.get()); + writeConflictRetry(opCtx.get(), "RecipientStateMachine::_removeRecipientDocument", nss, [&] { + const auto coll = acquireCollection( + opCtx.get(), + CollectionAcquisitionRequest(NamespaceString(nss), + PlacementConcern{boost::none, ShardVersion::UNSHARDED()}, + repl::ReadConcernArgs::get(opCtx.get()), + AcquisitionPrerequisites::kWrite), + MODE_IX); - opCtx->recoveryUnit()->onCommit([this](OperationContext*, boost::optional<Timestamp>) { - stdx::lock_guard<Latch> lk(_mutex); - _completionPromise.emplaceValue(); - }); + if (!coll.exists()) { + return; + } - deleteObjects(opCtx.get(), - coll, - BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName - << _metadata.getReshardingUUID()), - true /* justOne */); + WriteUnitOfWork wuow(opCtx.get()); - wuow.commit(); + opCtx->recoveryUnit()->onCommit([this](OperationContext*, boost::optional<Timestamp>) { + stdx::lock_guard<Latch> lk(_mutex); + _completionPromise.emplaceValue(); }); + + deleteObjects(opCtx.get(), + coll, + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName + << _metadata.getReshardingUUID()), + true /* justOne */); + + wuow.commit(); + }); } ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMetrics( diff --git a/src/mongo/db/s/resharding/resharding_util.cpp b/src/mongo/db/s/resharding/resharding_util.cpp index 6102c6ed66d..467c12c898f 100644 --- a/src/mongo/db/s/resharding/resharding_util.cpp +++ b/src/mongo/db/s/resharding/resharding_util.cpp @@ -364,7 +364,7 @@ NamespaceString getLocalConflictStashNamespace(UUID existingUUID, ShardId donorS } void doNoopWrite(OperationContext* opCtx, StringData opStr, const NamespaceString& nss) { - writeConflictRetry(opCtx, opStr, NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry(opCtx, opStr, NamespaceString::kRsOplogNamespace, [&] { AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); const std::string msg = str::stream() << opStr << " on " << nss.toStringForErrorMsg(); diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 9ea5e089ef4..7a3ca5b75bd 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -494,10 +494,7 @@ SessionCatalogMigrationDestination::_processSessionOplog(const BSONObj& oplogBSO oplogEntry.setHash(boost::none); writeConflictRetry( - opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { + opCtx, "SessionOplogMigration", NamespaceString::kSessionTransactionsTableNamespace, [&] { // Need to take global lock here so repl::logOp will not unlock it and trigger the // invariant that disallows unlocking global lock while inside a WUOW. Take the // transaction table db lock to ensure the same lock ordering with normal replicated diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 162d7d91aae..f10a0bdfcbc 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -280,7 +280,7 @@ void SessionCatalogMigrationSource::init(OperationContext* opCtx, writeConflictRetry( opCtx, "session migration initialization majority commit barrier", - NamespaceString::kRsOplogNamespace.ns(), + NamespaceString::kRsOplogNamespace, [&] { const auto message = BSON("sessionMigrateCloneStart" << _ns.ns()); diff --git a/src/mongo/db/s/sharding_index_catalog_ddl_util.cpp b/src/mongo/db/s/sharding_index_catalog_ddl_util.cpp index b83b62272a9..ea2cb71424a 100644 --- a/src/mongo/db/s/sharding_index_catalog_ddl_util.cpp +++ b/src/mongo/db/s/sharding_index_catalog_ddl_util.cpp @@ -74,7 +74,7 @@ void renameCollectionShardingIndexCatalog(OperationContext* opCtx, writeConflictRetry( opCtx, "RenameCollectionShardingIndexCatalog", - NamespaceString::kShardIndexCatalogNamespace.ns(), + NamespaceString::kShardIndexCatalogNamespace, [&]() { boost::optional<UUID> toUuid; WriteUnitOfWork wunit(opCtx); @@ -181,7 +181,7 @@ void addShardingIndexCatalogEntryToCollection(OperationContext* opCtx, indexCatalogEntry.setIndexCollectionUUID(indexCollectionUUID); writeConflictRetry( - opCtx, "AddIndexCatalogEntry", NamespaceString::kShardIndexCatalogNamespace.ns(), [&]() { + opCtx, "AddIndexCatalogEntry", NamespaceString::kShardIndexCatalogNamespace, [&]() { WriteUnitOfWork wunit(opCtx); AutoGetCollection userColl(opCtx, userCollectionNss, MODE_IX); auto acquisitions = acquireCollections( @@ -270,7 +270,7 @@ void removeShardingIndexCatalogEntryFromCollection(OperationContext* opCtx, writeConflictRetry( opCtx, "RemoveShardingIndexCatalogEntryFromCollection", - NamespaceString::kShardIndexCatalogNamespace.ns(), + NamespaceString::kShardIndexCatalogNamespace, [&]() { WriteUnitOfWork wunit(opCtx); AutoGetCollection userColl(opCtx, nss, MODE_IX); @@ -355,7 +355,7 @@ void replaceCollectionShardingIndexCatalog(OperationContext* opCtx, writeConflictRetry( opCtx, "ReplaceCollectionShardingIndexCatalog", - NamespaceString::kShardIndexCatalogNamespace.ns(), + NamespaceString::kShardIndexCatalogNamespace, [&]() { WriteUnitOfWork wunit(opCtx); AutoGetCollection userColl(opCtx, nss, MODE_IX); @@ -451,7 +451,7 @@ void dropCollectionShardingIndexCatalog(OperationContext* opCtx, const Namespace writeConflictRetry( opCtx, "DropCollectionShardingIndexCatalog", - NamespaceString::kShardIndexCatalogNamespace.ns(), + NamespaceString::kShardIndexCatalogNamespace, [&]() { boost::optional<UUID> collectionUUID; WriteUnitOfWork wunit(opCtx); @@ -516,7 +516,7 @@ void clearCollectionShardingIndexCatalog(OperationContext* opCtx, writeConflictRetry( opCtx, "ClearCollectionShardingIndexCatalog", - NamespaceString::kShardIndexCatalogNamespace.ns(), + NamespaceString::kShardIndexCatalogNamespace, [&]() { WriteUnitOfWork wunit(opCtx); AutoGetCollection userColl(opCtx, nss, MODE_IX); diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp index 18f28d5041d..96eb5c5a35d 100644 --- a/src/mongo/db/s/sharding_util.cpp +++ b/src/mongo/db/s/sharding_util.cpp @@ -155,7 +155,7 @@ Status createIndexOnCollection(OperationContext* opCtx, if (!collection) { CollectionOptions options; options.uuid = UUID::gen(); - writeConflictRetry(opCtx, "createIndexOnCollection", ns.ns(), [&] { + writeConflictRetry(opCtx, "createIndexOnCollection", ns, [&] { WriteUnitOfWork wunit(opCtx); auto db = autoColl.ensureDbExists(opCtx); collection = db->createCollection(opCtx, ns, options); @@ -197,7 +197,7 @@ Status createIndexOnCollection(OperationContext* opCtx, IndexBuildsCoordinator::get(opCtx)->createIndex( opCtx, collection->uuid(), indexSpec, indexConstraints, fromMigrate); } else { - writeConflictRetry(opCtx, "createIndexOnConfigCollection", ns.ns(), [&] { + writeConflictRetry(opCtx, "createIndexOnConfigCollection", ns, [&] { WriteUnitOfWork wunit(opCtx); CollectionWriter collWriter(opCtx, collection->uuid()); IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 0939a556a00..abb9901fc9e 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -898,111 +898,107 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS collection.exists()); } - writeConflictRetry( - opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() { - WriteUnitOfWork wuow(opCtx); - - if (nextState == ShardSplitDonorStateEnum::kBlocking) { - // Start blocking writes before getting an oplog slot to guarantee no - // writes to the tenant's data can commit with a timestamp after the - // block timestamp. - auto mtabVector = - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .getDonorAccessBlockersForMigration(uuid); - invariant(!mtabVector.empty()); - - for (auto& mtab : mtabVector) { - invariant(mtab); - mtab->startBlockingWrites(); - - opCtx->recoveryUnit()->onRollback( - [mtab](OperationContext*) { mtab->rollBackStartBlocking(); }); + writeConflictRetry(opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS, [&]() { + WriteUnitOfWork wuow(opCtx); + + if (nextState == ShardSplitDonorStateEnum::kBlocking) { + // Start blocking writes before getting an oplog slot to guarantee no + // writes to the tenant's data can commit with a timestamp after the + // block timestamp. + auto mtabVector = + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .getDonorAccessBlockersForMigration(uuid); + invariant(!mtabVector.empty()); + + for (auto& mtab : mtabVector) { + invariant(mtab); + mtab->startBlockingWrites(); + + opCtx->recoveryUnit()->onRollback( + [mtab](OperationContext*) { mtab->rollBackStartBlocking(); }); + } + } + + // Reserve an opTime for the write. + auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; + auto updatedStateDocBson = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + _stateDoc.setState(nextState); + switch (nextState) { + case ShardSplitDonorStateEnum::kUninitialized: + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; + case ShardSplitDonorStateEnum::kBlocking: + _stateDoc.setBlockOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kCommitted: + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kAborted: { + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + + invariant(_abortReason); + BSONObjBuilder bob; + _abortReason.value().serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + break; } + default: + MONGO_UNREACHABLE; + } + if (isInsert) { + return BSON("$setOnInsert" << _stateDoc.toBSON()); } - // Reserve an opTime for the write. - auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - auto updatedStateDocBson = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - _stateDoc.setState(nextState); - switch (nextState) { - case ShardSplitDonorStateEnum::kUninitialized: - case ShardSplitDonorStateEnum::kAbortingIndexBuilds: - break; - case ShardSplitDonorStateEnum::kBlocking: - _stateDoc.setBlockOpTime(oplogSlot); - break; - case ShardSplitDonorStateEnum::kCommitted: - _stateDoc.setCommitOrAbortOpTime(oplogSlot); - break; - case ShardSplitDonorStateEnum::kAborted: { - _stateDoc.setCommitOrAbortOpTime(oplogSlot); - - invariant(_abortReason); - BSONObjBuilder bob; - _abortReason.value().serializeErrorToBSON(&bob); - _stateDoc.setAbortReason(bob.obj()); - break; - } - default: - MONGO_UNREACHABLE; - } - if (isInsert) { - return BSON("$setOnInsert" << _stateDoc.toBSON()); - } + return _stateDoc.toBSON(); + }(); - return _stateDoc.toBSON(); - }(); - - auto updateOpTime = [&]() { - if (isInsert) { - const auto filter = - BSON(ShardSplitDonorDocument::kIdFieldName << uuid); - auto updateResult = Helpers::upsert(opCtx, - collection, - filter, - updatedStateDocBson, - /*fromMigrate=*/false); - - // '$setOnInsert' update operator can never modify an existing - // on-disk state doc. - invariant(!updateResult.existing); - invariant(!updateResult.numDocsModified); - - return repl::ReplClientInfo::forClient(opCtx->getClient()) - .getLastOp(); - } + auto updateOpTime = [&]() { + if (isInsert) { + const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); + auto updateResult = Helpers::upsert(opCtx, + collection, + filter, + updatedStateDocBson, + /*fromMigrate=*/false); - const auto originalRecordId = - Helpers::findOne(opCtx, - collection.getCollectionPtr(), - BSON("_id" << originalStateDocBson["_id"])); - const auto originalSnapshot = Snapshotted<BSONObj>( - opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); - invariant(!originalRecordId.isNull()); - - CollectionUpdateArgs args{originalSnapshot.value()}; - args.criteria = BSON("_id" << uuid); - args.oplogSlots = {oplogSlot}; - args.update = updatedStateDocBson; - - collection_internal::updateDocument( - opCtx, - collection.getCollectionPtr(), - originalRecordId, - originalSnapshot, - updatedStateDocBson, - collection_internal::kUpdateNoIndexes, - nullptr /* indexesAffected */, - nullptr /* OpDebug* */, - &args); - - return oplogSlot; - }(); - - wuow.commit(); - return updateOpTime; - }); + // '$setOnInsert' update operator can never modify an existing + // on-disk state doc. + invariant(!updateResult.existing); + invariant(!updateResult.numDocsModified); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + } + + const auto originalRecordId = + Helpers::findOne(opCtx, + collection.getCollectionPtr(), + BSON("_id" << originalStateDocBson["_id"])); + const auto originalSnapshot = Snapshotted<BSONObj>( + opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); + invariant(!originalRecordId.isNull()); + + CollectionUpdateArgs args{originalSnapshot.value()}; + args.criteria = BSON("_id" << uuid); + args.oplogSlots = {oplogSlot}; + args.update = updatedStateDocBson; + + collection_internal::updateDocument(opCtx, + collection.getCollectionPtr(), + originalRecordId, + originalSnapshot, + updatedStateDocBson, + collection_internal::kUpdateNoIndexes, + nullptr /* indexesAffected */, + nullptr /* OpDebug* */, + &args); + + return oplogSlot; + }(); + + wuow.commit(); + return updateOpTime; + }); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index 7afdd4e6d8e..bd65cfd282f 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -167,7 +167,7 @@ Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st " state document", repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); - return writeConflictRetry(opCtx, "insertShardSplitStateDoc", nss.ns(), [&]() -> Status { + return writeConflictRetry(opCtx, "insertShardSplitStateDoc", nss, [&]() -> Status { const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << stateDoc.getId() << ShardSplitDonorDocument::kExpireAtFieldName << BSON("$exists" << false)); @@ -200,7 +200,7 @@ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st str::stream() << nss.toStringForErrorMsg() << " does not exist"); } - return writeConflictRetry(opCtx, "updateShardSplitStateDoc", nss.ns(), [&]() -> Status { + return writeConflictRetry(opCtx, "updateShardSplitStateDoc", nss, [&]() -> Status { auto updateResult = Helpers::upsert(opCtx, collection, stateDoc.toBSON(), /*fromMigrate=*/false); if (updateResult.numMatched == 0) { @@ -228,7 +228,7 @@ StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitI str::stream() << nss.toStringForErrorMsg() << " does not exist"); } auto query = BSON(ShardSplitDonorDocument::kIdFieldName << shardSplitId); - return writeConflictRetry(opCtx, "ShardSplitDonorDeleteStateDoc", nss.ns(), [&]() -> bool { + return writeConflictRetry(opCtx, "ShardSplitDonorDeleteStateDoc", nss, [&]() -> bool { auto nDeleted = deleteObjects(opCtx, collection, query, true /* justOne */); return nDeleted > 0; }); diff --git a/src/mongo/db/startup_recovery.cpp b/src/mongo/db/startup_recovery.cpp index 77c57e7e1a6..55eba64e224 100644 --- a/src/mongo/db/startup_recovery.cpp +++ b/src/mongo/db/startup_recovery.cpp @@ -121,7 +121,7 @@ Status restoreMissingFeatureCompatibilityVersionDocument(OperationContext* opCtx // (Generic FCV reference): This FCV reference should exist across LTS binary versions. fcvDoc.setVersion(multiversion::GenericFCV::kLastLTS); - writeConflictRetry(opCtx, "insertFCVDocument", fcvNss.ns(), [&] { + writeConflictRetry(opCtx, "insertFCVDocument", fcvNss, [&] { WriteUnitOfWork wunit(opCtx); uassertStatusOK(collection_internal::insertDocument( opCtx, fcvColl, InsertStatement(fcvDoc.toBSON()), nullptr /* OpDebug */, false)); diff --git a/src/mongo/db/storage/kv/kv_drop_pending_ident_reaper.cpp b/src/mongo/db/storage/kv/kv_drop_pending_ident_reaper.cpp index 4b97397043e..b5690b2904f 100644 --- a/src/mongo/db/storage/kv/kv_drop_pending_ident_reaper.cpp +++ b/src/mongo/db/storage/kv/kv_drop_pending_ident_reaper.cpp @@ -161,7 +161,7 @@ void KVDropPendingIdentReaper::dropIdentsOlderThan(OperationContext* opCtx, cons for (auto& timestampAndIdentInfo : toDrop) { // Guards against catalog changes while dropping idents using KVEngine::dropIdent(). Yields // after dropping each ident. - writeConflictRetry(opCtx, "dropIdentsOlderThan", "", [&] { + writeConflictRetry(opCtx, "dropIdentsOlderThan", NamespaceString(), [&] { Lock::GlobalLock globalLock(opCtx, MODE_IX); const auto& dropTimestamp = timestampAndIdentInfo.first; diff --git a/src/mongo/db/storage/storage_util.h b/src/mongo/db/storage/storage_util.h index 00b79048e2f..3efc18aac65 100644 --- a/src/mongo/db/storage/storage_util.h +++ b/src/mongo/db/storage/storage_util.h @@ -123,7 +123,7 @@ Status insertBatchAndHandleRetry(OperationContext* opCtx, // Try to insert the batch one-at-a-time because the batch failed all-at-once inserting. for (auto it = docs.cbegin(); it != docs.cend(); ++it) { - auto status = writeConflictRetry(opCtx, "batchInsertDocuments", nsOrUUID.toString(), [&] { + auto status = writeConflictRetry(opCtx, "batchInsertDocuments", nsOrUUID, [&] { auto status = insertFn(opCtx, it, it + 1); if (!status.isOK()) { return status; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 4286d32c2c3..39280fbe53a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1085,10 +1085,11 @@ StatusWith<Timestamp> WiredTigerRecordStore::getLatestOplogTimestamp( } }); - WT_CURSOR* cursor = writeConflictRetry(opCtx, "getLatestOplogTimestamp", "local.oplog.rs", [&] { - auto cachedCursor = session->getCachedCursor(_tableId, ""); - return cachedCursor ? cachedCursor : session->getNewCursor(_uri); - }); + WT_CURSOR* cursor = writeConflictRetry( + opCtx, "getLatestOplogTimestamp", NamespaceString::kRsOplogNamespace, [&] { + auto cachedCursor = session->getCachedCursor(_tableId, ""); + return cachedCursor ? cachedCursor : session->getNewCursor(_uri); + }); ON_BLOCK_EXIT([&] { session->releaseCursor(_tableId, cursor, ""); }); int ret = cursor->prev(cursor); if (ret == WT_NOTFOUND) { @@ -1114,8 +1115,8 @@ StatusWith<Timestamp> WiredTigerRecordStore::getEarliestOplogTimestamp(Operation if (firstRecordTimestamp == Timestamp()) { WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache(); auto sessRaii = cache->getSession(); - WT_CURSOR* cursor = - writeConflictRetry(opCtx, "getEarliestOplogTimestamp", "local.oplog.rs", [&] { + WT_CURSOR* cursor = writeConflictRetry( + opCtx, "getEarliestOplogTimestamp", NamespaceString::kRsOplogNamespace, [&] { auto cachedCursor = sessRaii->getCachedCursor(_tableId, ""); return cachedCursor ? cachedCursor : sessRaii->getNewCursor(_uri); }); diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index c961a63ee6b..2bba3023e3c 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -556,7 +556,7 @@ void TransactionParticipant::performNoopWrite(OperationContext* opCtx, StringDat replCoord->canAcceptWritesForDatabase(opCtx, DatabaseName::kAdmin)); writeConflictRetry( - opCtx, "performNoopWrite", NamespaceString::kRsOplogNamespace.ns(), [&opCtx, &msg] { + opCtx, "performNoopWrite", NamespaceString::kRsOplogNamespace, [&opCtx, &msg] { WriteUnitOfWork wuow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( opCtx, BSON("msg" << msg)); diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 8e181031f98..4396bed64a2 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -36,6 +36,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/json.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/op_observer/op_observer_impl.h" #include "mongo/db/op_observer/oplog_writer_impl.h" #include "mongo/db/ops/update.h" @@ -170,7 +171,7 @@ protected: return "unittests.repltests"; } static NamespaceString nss() { - return NamespaceString(ns()); + return NamespaceString::createNamespaceString_forTest(ns()); } static const char* cllNS() { return "local.oplog.rs"; @@ -265,8 +266,8 @@ protected: } // These deletes don't get logged. void deleteAll(const char* ns) const { - ::mongo::writeConflictRetry(&_opCtx, "deleteAll", ns, [&] { - NamespaceString nss(ns); + NamespaceString nss(ns); + ::mongo::writeConflictRetry(&_opCtx, "deleteAll", nss, [&] { Lock::GlobalWrite lk(&_opCtx); OldClientContext ctx(&_opCtx, nss); WriteUnitOfWork wunit(&_opCtx); |