summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source_test.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-05-15 16:54:35 -0400
committerRandolph Tan <randolph@10gen.com>2019-05-17 16:49:22 -0400
commit32ade4aa6b6a73c3620486117388908ad9ad438c (patch)
tree757054f3a8c8bb898aeef89a9af8feabe7c9d215 /src/mongo/db/s/session_catalog_migration_source_test.cpp
parent57c127b9040a1203a86214cf0bf896aa069afbbe (diff)
downloadmongo-32ade4aa6b6a73c3620486117388908ad9ad438c.tar.gz
SERVER-41074 Make decision to send pre/post image oplog the same as the decision made for the originating oplog write.
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp327
1 files changed, 304 insertions, 23 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index c4fabc0b443..16492fbb17c 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -46,6 +46,8 @@ namespace {
using executor::RemoteCommandRequest;
const NamespaceString kNs("a.b");
+const KeyPattern kShardKey(BSON("x" << 1));
+const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100));
class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {};
@@ -107,7 +109,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
}
TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
@@ -126,7 +128,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
auto entry2 =
makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -142,7 +144,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -179,7 +181,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
auto entry1b =
makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -207,7 +209,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
auto entry2b =
makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime
repl::OpTypeEnum::kDelete, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
4, // statement id
@@ -227,7 +229,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
insertOplogEntry(entry1b);
insertOplogEntry(entry2b);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog,
@@ -284,7 +286,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
auto entry2 = makeOplogEntry(
repl::OpTime(Timestamp(52, 346), 2), // optime
repl::OpTypeEnum::kDelete, // op type
- BSON("y" << 50), // o
+ BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
1, // statement id
@@ -305,8 +307,8 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
auto entry4 =
makeOplogEntry(repl::OpTime(Timestamp(73, 6), 2), // optime
repl::OpTypeEnum::kUpdate, // op type
- BSON("x" << 19), // o
- BSON("$inc" << BSON("x" << 1)), // o2
+ BSON("$inc" << BSON("x" << 1)), // o
+ BSON("x" << 19), // o2
Date_t::now(), // wall clock time
3, // statement id
entry2.getOpTime(), // optime of previous write within same transaction
@@ -323,7 +325,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequece = {entry3, entry4, entry1, entry2};
@@ -385,7 +387,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -433,14 +435,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
auto entry3 = makeOplogEntry(
repl::OpTime(Timestamp(55, 12), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("z" << 40), // o
+ BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
2, // statement id
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry3);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -477,7 +479,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(
@@ -488,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) {
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -544,7 +546,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
auto entry = makeOplogEntry(
repl::OpTime(Timestamp(55, 12), 2), // optime
repl::OpTypeEnum::kInsert, // op type
- BSON("z" << 40), // o
+ BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
2, // statement id
@@ -587,7 +589,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -645,7 +647,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -674,7 +676,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -700,7 +702,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -725,7 +727,7 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -742,7 +744,7 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
@@ -784,7 +786,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
// Insert the 'insert' oplog entry into the oplog.
insertOplogEntry(insertOplog);
- SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
// Function to verify the oplog entry corresponding to the retryable write.
auto checkRetryableWriteEntry = [&] {
@@ -823,5 +825,284 @@ TEST_F(SessionCatalogMigrationSourceTest,
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ entry1.getOpTime()); // pre-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingChunkIsIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -5), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("y" << 1)), // o
+ BSON("x" << -5), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ entry1.getOpTime()); // pre-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ UpdatePreImageTouchingPostNotTouchingChunkShouldNotBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("x" << -50)), // o
+ BSON("x" << 10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ auto expectedSequece = {entry1, entry2};
+
+ for (auto oplog : expectedSequece) {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON());
+ migrationSource.fetchNextOplog(opCtx());
+ }
+
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest,
+ UpdatePreImageNotTouchingPostTouchingChunkShouldBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("x" << 50)), // o
+ BSON("x" << -10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkShouldBeIgnored) {
+ auto entry1 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kNoop, // op type
+ BSON("x" << -10 << "y" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry1);
+
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 346), 2), // optime
+ repl::OpTypeEnum::kUpdate, // op type
+ BSON("$set" << BSON("y" << 50)), // o
+ BSON("x" << -10), // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ entry1.getOpTime()); // post-image optime
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWriteNotInChunk) {
+ auto sessionId1 = makeLogicalSessionIdForTest();
+ auto sessionId2 = makeLogicalSessionIdForTest();
+
+ auto cmpResult = sessionId1.toBSON().woCompare(sessionId2.toBSON());
+ auto lowerSessionId = (cmpResult < 0) ? sessionId1 : sessionId2;
+ auto higherSessionId = (cmpResult < 0) ? sessionId2 : sessionId1;
+
+ auto entry1a = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 30), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+
+ auto entry1b =
+ makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << -50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 1, // statement id
+ entry1a.getOpTime()); // optime of previous write within same transaction
+
+ SessionTxnRecord sessionRecord1;
+ sessionRecord1.setSessionId(higherSessionId);
+ sessionRecord1.setTxnNum(1);
+ sessionRecord1.setLastWriteOpTime(entry1b.getOpTime());
+ sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ sessionRecord1.toBSON());
+
+ auto entry2a = makeOplogEntry(
+ repl::OpTime(Timestamp(43, 12), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << 30), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 3, // statement id
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+
+ auto entry2b =
+ makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime
+ repl::OpTypeEnum::kDelete, // op type
+ BSON("x" << 50), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 4, // statement id
+ entry2a.getOpTime()); // optime of previous write within same transaction
+
+ SessionTxnRecord sessionRecord2;
+ sessionRecord2.setSessionId(lowerSessionId);
+ sessionRecord2.setTxnNum(1);
+ sessionRecord2.setLastWriteOpTime(entry2b.getOpTime());
+ sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime());
+
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ sessionRecord2.toBSON());
+
+ insertOplogEntry(entry2a);
+ insertOplogEntry(entry1a);
+ insertOplogEntry(entry1b);
+ insertOplogEntry(entry2b);
+
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ auto expectedSequece = {entry1a, entry2b, entry2a};
+
+ for (auto oplog : expectedSequece) {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON());
+ migrationSource.fetchNextOplog(opCtx());
+ }
+
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
} // namespace
} // namespace mongo