diff options
author | Randolph Tan <randolph@10gen.com> | 2019-05-15 16:54:35 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2019-05-17 16:49:22 -0400 |
commit | 32ade4aa6b6a73c3620486117388908ad9ad438c (patch) | |
tree | 757054f3a8c8bb898aeef89a9af8feabe7c9d215 /src/mongo/db/s/session_catalog_migration_source_test.cpp | |
parent | 57c127b9040a1203a86214cf0bf896aa069afbbe (diff) | |
download | mongo-32ade4aa6b6a73c3620486117388908ad9ad438c.tar.gz |
SERVER-41074 Make decision to send pre/post image oplog the same as the decision made for the originating oplog write.
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 327 |
1 files changed, 304 insertions, 23 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index c4fabc0b443..16492fbb17c 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -46,6 +46,8 @@ namespace { using executor::RemoteCommandRequest; const NamespaceString kNs("a.b"); +const KeyPattern kShardKey(BSON("x" << 1)); +const ChunkRange kChunkRange(BSON("x" << 0), BSON("x" << 100)); class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {}; @@ -107,7 +109,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, } TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); } @@ -126,7 +128,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { auto entry2 = makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -142,7 +144,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -179,7 +181,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { auto entry1b = makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -207,7 +209,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { auto entry2b = makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime repl::OpTypeEnum::kDelete, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 4, // statement id @@ -227,7 +229,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { insertOplogEntry(entry1b); insertOplogEntry(entry2b); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog, @@ -284,7 +286,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd auto entry2 = makeOplogEntry( repl::OpTime(Timestamp(52, 346), 2), // optime repl::OpTypeEnum::kDelete, // op type - BSON("y" << 50), // o + BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time 1, // statement id @@ -305,8 +307,8 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd auto entry4 = makeOplogEntry(repl::OpTime(Timestamp(73, 6), 2), // optime repl::OpTypeEnum::kUpdate, // op type - BSON("x" << 19), // o - BSON("$inc" << BSON("x" << 1)), // o2 + BSON("$inc" << BSON("x" << 1)), // o + BSON("x" << 19), // o2 Date_t::now(), // wall clock time 3, // statement id entry2.getOpTime(), // optime of previous write within same transaction @@ -323,7 +325,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); auto expectedSequece = {entry3, entry4, entry1, entry2}; @@ -385,7 +387,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord2.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -433,14 +435,14 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { auto entry3 = makeOplogEntry( repl::OpTime(Timestamp(55, 12), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("z" << 40), // o + BSON("x" << 40), // o boost::none, // o2 Date_t::now(), // wall clock time 2, // statement id repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction insertOplogEntry(entry3); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -477,7 +479,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); migrationSource.notifyNewWriteOpTime( @@ -488,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { } TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) { - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); { @@ -544,7 +546,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer auto entry = makeOplogEntry( repl::OpTime(Timestamp(55, 12), 2), // optime repl::OpTypeEnum::kInsert, // op type - BSON("z" << 40), // o + BSON("x" << 40), // o boost::none, // o2 Date_t::now(), // wall clock time 2, // statement id @@ -587,7 +589,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -645,7 +647,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); { @@ -674,7 +676,7 @@ TEST_F(SessionCatalogMigrationSourceTest, DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -700,7 +702,7 @@ TEST_F(SessionCatalogMigrationSourceTest, DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); ASSERT_TRUE(migrationSource.hasMoreOplog()); @@ -725,7 +727,7 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -742,7 +744,7 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); @@ -784,7 +786,7 @@ TEST_F(SessionCatalogMigrationSourceTest, // Insert the 'insert' oplog entry into the oplog. insertOplogEntry(insertOplog); - SessionCatalogMigrationSource migrationSource(opCtx(), kNs); + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); // Function to verify the oplog entry corresponding to the retryable write. auto checkRetryableWriteEntry = [&] { @@ -823,5 +825,284 @@ TEST_F(SessionCatalogMigrationSourceTest, ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + entry1.getOpTime()); // pre-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingChunkIsIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -5), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("y" << 1)), // o + BSON("x" << -5), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + entry1.getOpTime()); // pre-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, + UpdatePreImageTouchingPostNotTouchingChunkShouldNotBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("x" << -50)), // o + BSON("x" << 10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + auto expectedSequece = {entry1, entry2}; + + for (auto oplog : expectedSequece) { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + migrationSource.fetchNextOplog(opCtx()); + } + + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + +TEST_F(SessionCatalogMigrationSourceTest, + UpdatePreImageNotTouchingPostTouchingChunkShouldBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("x" << 50)), // o + BSON("x" << -10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkShouldBeIgnored) { + auto entry1 = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kNoop, // op type + BSON("x" << -10 << "y" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry1); + + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(52, 346), 2), // optime + repl::OpTypeEnum::kUpdate, // op type + BSON("$set" << BSON("y" << 50)), // o + BSON("x" << -10), // o2 + Date_t::now(), // wall clock time + 1, // statement id + repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction + boost::none, // pre-image optime + entry1.getOpTime()); // post-image optime + insertOplogEntry(entry2); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWriteNotInChunk) { + auto sessionId1 = makeLogicalSessionIdForTest(); + auto sessionId2 = makeLogicalSessionIdForTest(); + + auto cmpResult = sessionId1.toBSON().woCompare(sessionId2.toBSON()); + auto lowerSessionId = (cmpResult < 0) ? sessionId1 : sessionId2; + auto higherSessionId = (cmpResult < 0) ? sessionId2 : sessionId1; + + auto entry1a = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 30), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + + auto entry1b = + makeOplogEntry(repl::OpTime(Timestamp(67, 54801), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << -50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 1, // statement id + entry1a.getOpTime()); // optime of previous write within same transaction + + SessionTxnRecord sessionRecord1; + sessionRecord1.setSessionId(higherSessionId); + sessionRecord1.setTxnNum(1); + sessionRecord1.setLastWriteOpTime(entry1b.getOpTime()); + sessionRecord1.setLastWriteDate(*entry1b.getWallClockTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + sessionRecord1.toBSON()); + + auto entry2a = makeOplogEntry( + repl::OpTime(Timestamp(43, 12), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << 30), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 3, // statement id + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + + auto entry2b = + makeOplogEntry(repl::OpTime(Timestamp(789, 13), 2), // optime + repl::OpTypeEnum::kDelete, // op type + BSON("x" << 50), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 4, // statement id + entry2a.getOpTime()); // optime of previous write within same transaction + + SessionTxnRecord sessionRecord2; + sessionRecord2.setSessionId(lowerSessionId); + sessionRecord2.setTxnNum(1); + sessionRecord2.setLastWriteOpTime(entry2b.getOpTime()); + sessionRecord2.setLastWriteDate(*entry2b.getWallClockTime()); + + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + sessionRecord2.toBSON()); + + insertOplogEntry(entry2a); + insertOplogEntry(entry1a); + insertOplogEntry(entry1b); + insertOplogEntry(entry2b); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + auto expectedSequece = {entry1a, entry2b, entry2a}; + + for (auto oplog : expectedSequece) { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(oplog.toBSON(), nextOplogResult.oplog->toBSON()); + migrationSource.fetchNextOplog(opCtx()); + } + + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + } // namespace } // namespace mongo |