summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwenqinYe <wenqin908@gmail.com>2022-11-28 22:25:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-28 23:05:01 +0000
commitee8a38cc92b5ff1e39ff2e702a17e25d29280409 (patch)
tree9a9bd3a5e9099f9443fc8e7b5dabdc47cf58b869
parent869c7519c280bc999243eb334747e842ed2821f6 (diff)
downloadmongo-ee8a38cc92b5ff1e39ff2e702a17e25d29280409.tar.gz
SERVER-71544 Fix race condition on _sessionCatalogSource in LogOpShardingHandler::commit
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp21
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h6
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp21
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp3
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h7
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp40
7 files changed, 79 insertions, 21 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 6607d2b2f33..cfaabdef023 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -288,6 +288,7 @@ void LogTransactionOperationsForShardingHandler::commit(OperationContext* opCtx,
}
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(
+ OperationContext* opCtx,
const ShardsvrMoveRange& request,
const WriteConcernOptions& writeConcern,
const BSONObj& shardKeyPattern,
@@ -300,7 +301,13 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(
_args.getToShard().toString())),
_donorConnStr(std::move(donorConnStr)),
_recipientHost(std::move(recipientHost)),
- _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) {}
+ _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) {
+ auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
+ _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>(
+ opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern());
+ }
+}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -313,10 +320,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
invariant(_state == kNew);
invariant(!opCtx->lockState()->isLocked());
- auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
- _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>(
- opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern());
+ if (_sessionCatalogSource) {
+ _sessionCatalogSource->init(opCtx);
// Prime up the session migration source if there are oplog entries to migrate.
_sessionCatalogSource->fetchNextOplog(opCtx);
@@ -592,10 +597,8 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) {
- if (auto sessionSource = _sessionCatalogSource.get()) {
- if (!opTime.isNull()) {
- sessionSource->notifyNewWriteOpTime(opTime, entryAtOpTimeType);
- }
+ if (_sessionCatalogSource && !opTime.isNull()) {
+ _sessionCatalogSource->notifyNewWriteOpTime(opTime, entryAtOpTimeType);
}
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 02a9a04c266..8dfd01e45cb 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -88,7 +88,8 @@ class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource
MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete;
public:
- MigrationChunkClonerSourceLegacy(const ShardsvrMoveRange& request,
+ MigrationChunkClonerSourceLegacy(OperationContext* opCtx,
+ const ShardsvrMoveRange& request,
const WriteConcernOptions& writeConcern,
const BSONObj& shardKeyPattern,
ConnectionString donorConnStr,
@@ -343,7 +344,7 @@ private:
std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource;
// Protects the entries below
- Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSourceLegacy::_mutex");
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSourceLegacy::_mutex");
// The current state of the cloner
State _state{kNew};
@@ -386,7 +387,6 @@ private:
// False if the move chunk request specified ForceJumbo::kDoNotForce, true otherwise.
const bool _forceJumbo;
-
struct JumboChunkCloneState {
// Plan executor for collection scan used to clone docs.
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> clonerExec;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index b43c52d607d..1a2b2427172 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -276,7 +276,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -387,7 +388,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, RemoveDuplicateDocuments) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -481,7 +483,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, OneLargeDocumentTransferMods) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -543,7 +546,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -616,7 +620,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) {
TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -636,7 +641,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
@@ -656,7 +662,8 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
const ShardsvrMoveRange req =
createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
+ MigrationChunkClonerSourceLegacy cloner(operationContext(),
+ req,
WriteConcernOptions(),
kShardKeyPattern,
kDonorConnStr,
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index da33366e8a8..d7b77300b3c 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -288,7 +288,7 @@ void MigrationSourceManager::startClone() {
// migration, write operations require the cloner to be present in order to track changes to
// the chunk which needs to be transmitted to the recipient.
_cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>(
- _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
+ _opCtx, _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
_coordinator.emplace(_cloneDriver->getSessionId(),
_args.getFromShard(),
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index b6f0f93f4ed..d5b03f4d5e4 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -213,8 +213,9 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
: _ns(std::move(ns)),
_rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()),
_chunkRange(std::move(chunk)),
- _keyPattern(shardKey) {
+ _keyPattern(shardKey) {}
+void SessionCatalogMigrationSource::init(OperationContext* opCtx) {
DBDirectClient client(opCtx);
FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
// Skip internal sessions for retryable writes with aborted or in progress transactions since
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 503b718eeaa..2f826b369c8 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -95,6 +95,13 @@ public:
KeyPattern shardKey);
/**
+ * Gets the session oplog entries to be sent to the destination. The initialization is separated
+ * from the constructor to allow the member functions of the SessionCatalogMigrationSource to be
+ * called before the initialization step is finished.
+ */
+ void init(OperationContext* opCtx);
+
+ /**
* Returns true if there are more oplog entries to fetch at this moment. Note that new writes
* can still continue to come in after this has returned false, so it can become true again.
* Once this has returned false, this means that it has depleted the existing buffer so it
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 49c40b93d3c..33baf246cba 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -307,6 +307,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -368,6 +369,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWritesMultiStmtIds) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -466,6 +468,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
insertOplogEntry(entry2b);
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog,
@@ -575,6 +578,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequence = {entry3, entry4, entry1, entry2};
@@ -657,6 +661,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequence = {entry3, entry4, entry1, entry2};
@@ -713,6 +718,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ForgeImageEntriesWhenFetchingEntriesWi
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
// The next oplog entry should be the forged preImage entry.
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -791,6 +797,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
sessionRecord2.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -857,6 +864,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -894,6 +902,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -906,6 +915,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
TEST_F(SessionCatalogMigrationSourceTest,
ReturnDeadEndSentinelOplogEntryForNewCommittedNonInternalTransaction) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
const auto sessionId = makeLogicalSessionIdForTest();
@@ -944,6 +954,7 @@ DEATH_TEST_F(SessionCatalogMigrationSourceTest,
ThrowUponSeeingNewCommittedForInternalTransactionForNonRetryableWrite,
"Cannot add op time for a non-retryable internal transaction") {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest();
@@ -969,6 +980,7 @@ DEATH_TEST_F(SessionCatalogMigrationSourceTest,
TEST_F(SessionCatalogMigrationSourceTest,
DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteBasic) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
@@ -1041,6 +1053,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
TEST_F(SessionCatalogMigrationSourceTest,
DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteFetchPrePostImage) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
@@ -1143,6 +1156,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
TEST_F(SessionCatalogMigrationSourceTest,
DeriveOplogEntriesForNewCommittedInternalTransactionForRetryableWriteForgePrePostImage) {
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage,
@@ -1232,6 +1246,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
const auto txnNumber = TxnNumber{1};
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -1340,6 +1355,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -1400,6 +1416,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -1443,6 +1460,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -1482,6 +1500,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForN
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -1566,6 +1585,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@@ -1690,6 +1710,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@@ -1782,6 +1803,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
// Create a SessionCatalogMigrationSource. It should return only the oplog entry for the
// internal session with the latest txnNumber.
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -1819,6 +1841,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
// Create another SessionCatalogMigrationSource. It should still return only the oplog entry
// for the internal session with the latest txnNumber.
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -1929,6 +1952,7 @@ TEST_F(
client.insert(NamespaceString::kConfigImagesNamespace.ns(), imageEntryForOp2.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
const auto expectedSessionId = *getParentSessionId(sessionId);
const auto expectedTxnNumber = *sessionId.getTxnNumber();
@@ -1991,6 +2015,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -2030,6 +2055,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForNo
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -2061,6 +2087,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForRe
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -2080,6 +2107,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreInProgressTransaction) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -2129,6 +2157,7 @@ TEST_F(SessionCatalogMigrationSourceTest, IgnoreAbortedTransaction) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -2185,6 +2214,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
insertOplogEntry(insertOplog);
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
// Function to verify the oplog entry corresponding to the retryable write.
auto checkRetryableWriteEntry = [&] {
@@ -2264,6 +2294,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsI
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
@@ -2306,6 +2337,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingC
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
@@ -2350,6 +2382,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequence = {entry1, entry2};
@@ -2407,6 +2440,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
@@ -2450,6 +2484,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkSho
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
@@ -2531,6 +2566,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
insertOplogEntry(entry2b);
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequence = {entry1a, entry2b, entry2a};
@@ -2580,6 +2616,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit
// Check for the initial state of the SessionCatalogMigrationSource, and drain the majority
// committed session writes.
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
ASSERT_TRUE(migrationSource.hasMoreOplog());
ASSERT_FALSE(migrationSource.inCatchupPhase());
migrationSource.fetchNextOplog(opCtx());
@@ -2619,6 +2656,7 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithNoCommittedWr
const auto txnNumber = TxnNumber{1};
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
auto entry = makeOplogEntry(
repl::OpTime(Timestamp(52, 345), 2), // optime
@@ -2691,6 +2729,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FilterRewrittenOplogEntriesOutsideChun
insertOplogEntry(entry);
}
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
std::vector<repl::OplogEntry> filteredEntries = {entries.at(1)};
while (migrationSource.fetchNextOplog(opCtx())) {
@@ -2737,6 +2776,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
}
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ migrationSource.init(opCtx());
std::vector<repl::OplogEntry> filteredEntries = {entries.at(1)};