summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp131
1 files changed, 125 insertions, 6 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index c1e707c0c9f..2bd48ce08dc 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -31,7 +31,9 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/session_catalog_migration_source.h"
+#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/unittest/unittest.h"
@@ -58,11 +60,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime());
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord;
@@ -104,9 +108,11 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OplogEntry entry1a(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1a.setStatementId(0);
repl::OplogEntry entry1b(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
+ entry1b.setStatementId(1);
entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime());
SessionTxnRecord sessionRecord1;
@@ -121,10 +127,12 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OplogEntry entry2a(
repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2a.setStatementId(3);
repl::OplogEntry entry2b(
repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime());
+ entry2b.setStatementId(4);
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
@@ -162,15 +170,23 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
}
};
- checkNextBatch(entry1b, entry1a);
+ if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) <
+ 0) {
+ checkNextBatch(entry2b, entry2a);
- ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
- checkNextBatch(entry2b, entry2a);
+ checkNextBatch(entry1b, entry1a);
- ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_FALSE(migrationSource.hasMoreOplog());
+ } else {
+ checkNextBatch(entry1b, entry1a);
+
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+
+ checkNextBatch(entry2b, entry2a);
+ }
}
// It is currently not possible to have 2 findAndModify operations in one transaction, but this
@@ -181,17 +197,20 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2.setPreImageOpTime(entry1.getOpTime());
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry3.setStatementId(2);
insertOplogEntry(entry3);
repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2),
@@ -202,6 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("$inc" << BSON("x" << 1)));
entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime());
entry4.setPostImageOpTime(entry3.getOpTime());
+ entry4.setStatementId(3);
insertOplogEntry(entry4);
SessionTxnRecord sessionRecord;
@@ -236,6 +256,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
@@ -254,6 +275,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
NamespaceString("x.y"),
BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord2;
@@ -284,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
@@ -298,11 +321,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry2(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry3.setStatementId(2);
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(kNs);
@@ -366,6 +391,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
kNs,
BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(0);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -385,6 +411,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OplogEntry entry(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(1);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -403,6 +430,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OplogEntry entry(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(2);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -418,6 +446,97 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
}
}
+TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHistory) {
+ const NamespaceString kNs("a.b");
+
+ repl::OplogEntry entry(
+ repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
+ entry.setStatementId(0);
+ insertOplogEntry(entry);
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(sessionId);
+ sessionRecord.setTxnNum(31);
+ sessionRecord.setLastWriteOpTime(entry.getOpTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ }
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+
+ auto oplog = *nextOplogResult.oplog;
+ ASSERT_TRUE(oplog.getObject2());
+ ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2());
+ ASSERT_TRUE(oplog.getStatementId());
+ ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId());
+
+ auto sessionInfo = oplog.getOperationSessionInfo();
+ ASSERT_TRUE(sessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, *sessionInfo.getSessionId());
+ ASSERT_TRUE(sessionInfo.getTxnNumber());
+ ASSERT_EQ(31, *sessionInfo.getTxnNumber());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
+ const NamespaceString kNs("a.b");
+
+ repl::OplogEntry entry(
+ repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
+ entry.setStatementId(0);
+ insertOplogEntry(entry);
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(sessionId);
+ sessionRecord.setTxnNum(31);
+ sessionRecord.setLastWriteOpTime(entry.getOpTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
+ }
+
+ ASSERT_OK(repl::ReplicationProcess::get(opCtx())->incrementRollbackID(opCtx()));
+
+ ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException);
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+}
+
} // namespace
} // namespace mongo