diff options
author | wenqinYe <wenqin908@gmail.com> | 2022-11-28 22:25:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-28 23:05:01 +0000 |
commit | ee8a38cc92b5ff1e39ff2e702a17e25d29280409 (patch) | |
tree | 9a9bd3a5e9099f9443fc8e7b5dabdc47cf58b869 | |
parent | 869c7519c280bc999243eb334747e842ed2821f6 (diff) | |
download | mongo-ee8a38cc92b5ff1e39ff2e702a17e25d29280409.tar.gz |
SERVER-71544 Fix race condition on _sessionCatalogSource in LogOpShardingHandler::commit
7 files changed, 79 insertions, 21 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 6607d2b2f33..cfaabdef023 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -288,6 +288,7 @@ void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx, } MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy( + OperationContext* opCtx, const ShardsvrMoveRange& request, const WriteConcernOptions& writeConcern, const BSONObj& shardKeyPattern, @@ -300,7 +301,13 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy( _args.getToShard().toString())), _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), - _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) {} + _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { + _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>( + opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern()); + } +} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -313,10 +320,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, invariant(_state == kNew); invariant(!opCtx->lockState()->isLocked()); - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { - _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>( - opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern()); + if (_sessionCatalogSource) { + _sessionCatalogSource->init(opCtx); // Prime up the session migration source if there are oplog entries to migrate. _sessionCatalogSource->fetchNextOplog(opCtx); @@ -592,10 +597,8 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue( const repl::OpTime& opTime, SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) { - if (auto sessionSource = _sessionCatalogSource.get()) { - if (!opTime.isNull()) { - sessionSource->notifyNewWriteOpTime(opTime, entryAtOpTimeType); - } + if (_sessionCatalogSource && !opTime.isNull()) { + _sessionCatalogSource->notifyNewWriteOpTime(opTime, entryAtOpTimeType); } } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 02a9a04c266..8dfd01e45cb 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -88,7 +88,8 @@ class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete; public: - MigrationChunkClonerSourceLegacy(const ShardsvrMoveRange& request, + MigrationChunkClonerSourceLegacy(OperationContext* opCtx, + const ShardsvrMoveRange& request, const WriteConcernOptions& writeConcern, const BSONObj& shardKeyPattern, ConnectionString donorConnStr, @@ -343,7 +344,7 @@ private: std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource; // Protects the entries below - Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSourceLegacy::_mutex"); + mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSourceLegacy::_mutex"); // The current state of the cloner State _state{kNew}; @@ -386,7 +387,6 @@ private: // False if the move chunk request specified ForceJumbo::kDoNotForce, true otherwise. const bool _forceJumbo; - struct JumboChunkCloneState { // Plan executor for collection scan used to clone docs. std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> clonerExec; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index b43c52d607d..1a2b2427172 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -276,7 +276,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -387,7 +388,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, RemoveDuplicateDocuments) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -481,7 +483,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, OneLargeDocumentTransferMods) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -543,7 +546,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -616,7 +620,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) { TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -636,7 +641,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, @@ -656,7 +662,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { const ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, + MigrationChunkClonerSourceLegacy cloner(operationContext(), + req, WriteConcernOptions(), kShardKeyPattern, kDonorConnStr, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index da33366e8a8..d7b77300b3c 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -288,7 +288,7 @@ void MigrationSourceManager::startClone() { // migration, write operations require the cloner to be present in order to track changes to // the chunk which needs to be transmitted to the recipient. _cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>( - _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost); + _opCtx, _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost); _coordinator.emplace(_cloneDriver->getSessionId(), _args.getFromShard(), diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index b6f0f93f4ed..d5b03f4d5e4 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -213,8 +213,9 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o : _ns(std::move(ns)), _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()), _chunkRange(std::move(chunk)), - _keyPattern(shardKey) { + _keyPattern(shardKey) {} +void SessionCatalogMigrationSource::init(OperationContext* opCtx) { DBDirectClient client(opCtx); FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; // Skip internal sessions for retryable writes with aborted or in progress transactions since diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 503b718eeaa..2f826b369c8 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -95,6 +95,13 @@ public: KeyPattern shardKey); /** + * Gets the session oplog entries to be sent to the destination. The initialization is separated + * from the constructor to allow the member functions of the SessionCatalogMigrationSource to be + * called before the initialization step is finished. + */ + void init(OperationContext* opCtx); + + /** * Returns true if there are more oplog entries to fetch at this moment. Note that new writes * can still continue to come in after this has returned false, so it can become true again. * Once this has returned false, this means that it has depleted the existing buffer so it diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 49c40b93d3c..33baf246cba 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -307,6 +307,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -368,6 +369,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWritesMultiStmtIds) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -466,6 +468,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { insertOplogEntry(entry2b); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog, @@ -575,6 +578,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequence = {entry3, entry4, entry1, entry2}; @@ -657,6 +661,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequence = {entry3, entry4, entry1, entry2}; @@ -713,6 +718,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ForgeImageEntriesWhenFetchingEntriesWi client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); // The next oplog entry should be the forged preImage entry. ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -791,6 +797,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { sessionRecord2.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -857,6 +864,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -894,6 +902,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -906,6 +915,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { TEST_F(SessionCatalogMigrationSourceTest, ReturnDeadEndSentinelOplogEntryForNewCommittedNonInternalTransaction) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); const auto sessionId = makeLogicalSessionIdForTest(); @@ -944,6 +954,7 @@ DEATH_TEST_F(SessionCatalogMigrationSourceTest, ThrowUponSeeingNewCommittedForInternalTransactionForNonRetryableWrite, "Cannot add op time for a non-retryable internal transaction") { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest(); @@ -969,6 +980,7 @@ DEATH_TEST_F(SessionCatalogMigrationSourceTest, TEST_F(SessionCatalogMigrationSourceTest, DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteBasic) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); @@ -1041,6 +1053,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TEST_F(SessionCatalogMigrationSourceTest, DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteFetchPrePostImage) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); @@ -1143,6 +1156,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TEST_F(SessionCatalogMigrationSourceTest, DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteForgePrePostImage) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage, @@ -1232,6 +1246,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer const auto txnNumber = TxnNumber{1}; SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); { @@ -1340,6 +1355,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -1400,6 +1416,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -1443,6 +1460,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -1482,6 +1500,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForN client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -1566,6 +1585,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); const auto expectedSessionId = *getParentSessionId(sessionId); const auto expectedTxnNumber = *sessionId.getTxnNumber(); @@ -1690,6 +1710,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); const auto expectedSessionId = *getParentSessionId(sessionId); const auto expectedTxnNumber = *sessionId.getTxnNumber(); @@ -1782,6 +1803,7 @@ TEST_F(SessionCatalogMigrationSourceTest, // Create a SessionCatalogMigrationSource. It should return only the oplog entry for the // internal session with the latest txnNumber. SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -1819,6 +1841,7 @@ TEST_F(SessionCatalogMigrationSourceTest, // Create another SessionCatalogMigrationSource. It should still return only the oplog entry // for the internal session with the latest txnNumber. SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -1929,6 +1952,7 @@ TEST_F( client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); const auto expectedSessionId = *getParentSessionId(sessionId); const auto expectedTxnNumber = *sessionId.getTxnNumber(); @@ -1991,6 +2015,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -2030,6 +2055,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForNo client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -2061,6 +2087,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForRe client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -2080,6 +2107,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreInProgressTransaction) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -2129,6 +2157,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreAbortedTransaction) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -2185,6 +2214,7 @@ TEST_F(SessionCatalogMigrationSourceTest, insertOplogEntry(insertOplog); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); // Function to verify the oplog entry corresponding to the retryable write. auto checkRetryableWriteEntry = [&] { @@ -2264,6 +2294,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsI client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } @@ -2306,6 +2337,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingC client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } @@ -2350,6 +2382,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequence = {entry1, entry2}; @@ -2407,6 +2440,7 @@ TEST_F(SessionCatalogMigrationSourceTest, client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } @@ -2450,6 +2484,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkSho client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } @@ -2531,6 +2566,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite insertOplogEntry(entry2b); SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequence = {entry1a, entry2b, entry2a}; @@ -2580,6 +2616,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit // Check for the initial state of the SessionCatalogMigrationSource, and drain the majority // committed session writes. SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); ASSERT_TRUE(migrationSource.hasMoreOplog()); ASSERT_FALSE(migrationSource.inCatchupPhase()); migrationSource.fetchNextOplog(opCtx()); @@ -2619,6 +2656,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithNoCommittedWr const auto txnNumber = TxnNumber{1}; SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); auto entry = makeOplogEntry( repl::OpTime(Timestamp(52, 345), 2), // optime @@ -2691,6 +2729,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FilterRewrittenOplogEntriesOutsideChun insertOplogEntry(entry); } SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); std::vector<repl::OplogEntry> filteredEntries = {entries.at(1)}; while (migrationSource.fetchNextOplog(opCtx())) { @@ -2737,6 +2776,7 @@ TEST_F(SessionCatalogMigrationSourceTest, } SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + migrationSource.init(opCtx()); std::vector<repl::OplogEntry> filteredEntries = {entries.at(1)}; |