diff options
author | Randolph Tan <randolph@10gen.com> | 2017-10-11 13:32:15 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-10-25 01:20:09 -0400 |
commit | 67f735e6705091659e2a8cf46a9285f09bcf749a (patch) | |
tree | ffcaa27ecec2babe0c2dba0452a6866a5a3ed9f3 /src/mongo/db/s/session_catalog_migration_source_test.cpp | |
parent | 5b9b9a9f04b06109b77b5522f7318c366deecf6f (diff) | |
download | mongo-67f735e6705091659e2a8cf46a9285f09bcf749a.tar.gz |
SERVER-30880 Handle migration of sessions with incomplete history
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 131 |
1 files changed, 125 insertions, 6 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index c1e707c0c9f..2bd48ce08dc 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -31,7 +31,9 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/s/session_catalog_migration_source.h" +#include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/executor/remote_command_request.h" #include "mongo/unittest/unittest.h" @@ -58,11 +60,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime()); + entry2.setStatementId(1); insertOplogEntry(entry2); SessionTxnRecord sessionRecord; @@ -104,9 +108,11 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OplogEntry entry1a( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1a.setStatementId(0); repl::OplogEntry entry1b( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); + entry1b.setStatementId(1); entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime()); SessionTxnRecord sessionRecord1; @@ -121,10 +127,12 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OplogEntry entry2a( repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2a.setStatementId(3); repl::OplogEntry entry2b( repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime()); + entry2b.setStatementId(4); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); @@ -162,15 +170,23 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { } }; - checkNextBatch(entry1b, entry1a); + if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) < + 0) { + checkNextBatch(entry2b, entry2a); - ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); - checkNextBatch(entry2b, entry2a); + checkNextBatch(entry1b, entry1a); - ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_FALSE(migrationSource.hasMoreOplog()); + } else { + checkNextBatch(entry1b, entry1a); + + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + + checkNextBatch(entry2b, entry2a); + } } // It is currently not possible to have 2 findAndModify operations in one transaction, but this @@ -181,17 +197,20 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2.setPreImageOpTime(entry1.getOpTime()); + entry2.setStatementId(1); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry3.setStatementId(2); insertOplogEntry(entry3); repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2), @@ -202,6 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd BSON("$inc" << BSON("x" << 1))); entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime()); entry4.setPostImageOpTime(entry3.getOpTime()); + entry4.setStatementId(3); insertOplogEntry(entry4); SessionTxnRecord sessionRecord; @@ -236,6 +256,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; @@ -254,6 +275,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { NamespaceString("x.y"), BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2.setStatementId(1); insertOplogEntry(entry2); SessionTxnRecord sessionRecord2; @@ -284,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; @@ -298,11 +321,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry2( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2.setStatementId(1); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry3.setStatementId(2); insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(kNs); @@ -366,6 +391,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(0); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -385,6 +411,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OplogEntry entry( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(1); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -403,6 +430,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OplogEntry entry( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(2); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -418,6 +446,97 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer } } +TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHistory) { + const NamespaceString kNs("a.b"); + + repl::OplogEntry entry( + repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); + entry.setStatementId(0); + insertOplogEntry(entry); + + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(sessionId); + sessionRecord.setTxnNum(31); + sessionRecord.setLastWriteOpTime(entry.getOpTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + } + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + + auto oplog = *nextOplogResult.oplog; + ASSERT_TRUE(oplog.getObject2()); + ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2()); + ASSERT_TRUE(oplog.getStatementId()); + ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId()); + + auto sessionInfo = oplog.getOperationSessionInfo(); + ASSERT_TRUE(sessionInfo.getSessionId()); + ASSERT_EQ(sessionId, *sessionInfo.getSessionId()); + ASSERT_TRUE(sessionInfo.getTxnNumber()); + ASSERT_EQ(31, *sessionInfo.getTxnNumber()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + +TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { + const NamespaceString kNs("a.b"); + + repl::OplogEntry entry( + repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); + entry.setStatementId(0); + insertOplogEntry(entry); + + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(sessionId); + sessionRecord.setTxnNum(31); + sessionRecord.setLastWriteOpTime(entry.getOpTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + } + + ASSERT_OK(repl::ReplicationProcess::get(opCtx())->incrementRollbackID(opCtx())); + + ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException); + ASSERT_TRUE(migrationSource.hasMoreOplog()); +} + } // namespace } // namespace mongo |