summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source_test.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-04-06 22:49:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-07 00:05:29 +0000
commitf492d8ef072ce011b31f8fbba10e013c001c5a5a (patch)
tree5e8e5d1c9d56a218ef13c04092b5c408e40bc237 /src/mongo/db/s/session_catalog_migration_source_test.cpp
parentecbaddb63705f5ac10f299a539a584d24fcfa20c (diff)
downloadmongo-f492d8ef072ce011b31f8fbba10e013c001c5a5a.tar.gz
SERVER-65293 Chunk migration should only migrate internal sessions for retryable writes with the highest txnNumber
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp143
1 files changed, 135 insertions, 8 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 4ead7b2f031..21ec908a1af 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -490,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
}
};
- if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) <
+ if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) >
0) {
checkNextBatch(entry2b, entry2a);
@@ -1717,6 +1717,133 @@ TEST_F(SessionCatalogMigrationSourceTest,
runTest(true /*isPrepared */);
}
+TEST_F(SessionCatalogMigrationSourceTest,
+ DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteWithLatestTxnNumber) {
+ DBDirectClient client(opCtx());
+
+ const auto parentSessionId = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = TxnNumber{1};
+
+ const auto childSessionId1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentSessionId, parentTxnNumber);
+ const auto childTxnNumber1 = TxnNumber{1};
+
+ auto op1 = makeDurableReplOp(
+ repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1});
+ auto opTime1 = repl::OpTime(Timestamp(210, 1), 1);
+ auto entry1 = makeApplyOpsOplogEntry(opTime1,
+ {}, // prevOpTime
+ {op1},
+ childSessionId1,
+ childTxnNumber1,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry1);
+
+ SessionTxnRecord txnRecord1;
+ txnRecord1.setSessionId(childSessionId1);
+ txnRecord1.setTxnNum(childTxnNumber1);
+ txnRecord1.setLastWriteOpTime(opTime1);
+ txnRecord1.setLastWriteDate(Date_t::now());
+ txnRecord1.setState(DurableTxnStateEnum::kCommitted);
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord1.toBSON());
+
+ ++parentTxnNumber;
+ const auto childSessionId2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentSessionId, parentTxnNumber);
+ const auto childTxnNumber2 = TxnNumber{1};
+
+ auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {1});
+ auto opTime2 = repl::OpTime(Timestamp(210, 2), 1);
+ auto entry2 = makeApplyOpsOplogEntry(opTime2,
+ {}, // prevOpTime
+ {op2},
+ childSessionId2,
+ childTxnNumber2,
+ false, // isPrepare
+ false); // isPartial
+ insertOplogEntry(entry2);
+
+ SessionTxnRecord txnRecord2;
+ txnRecord2.setSessionId(childSessionId2);
+ txnRecord2.setTxnNum(childTxnNumber2);
+ txnRecord2.setLastWriteOpTime(opTime2);
+ txnRecord2.setLastWriteDate(Date_t::now());
+ txnRecord2.setState(DurableTxnStateEnum::kCommitted);
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord2.toBSON());
+
+ {
+ // Create a SessionCatalogMigrationSource. It should return only the oplog entry for the
+ // internal session with the latest txnNumber.
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op2.toBSON());
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ }
+
+ const auto otherParentSessionId = makeLogicalSessionIdForTest();
+ const auto otherParentTxnNumber = TxnNumber{1};
+
+ auto opTime3 = repl::OpTime(Timestamp(210, 3), 1);
+ auto entry3 = makeOplogEntry(opTime3,
+ repl::OpTypeEnum::kInsert,
+ BSON("x" << 3), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time,
+ otherParentSessionId,
+ otherParentTxnNumber,
+ {1}, // statement ids
+ {}); // prevOpTime
+ insertOplogEntry(entry3);
+
+ SessionTxnRecord txnRecord3;
+ txnRecord3.setSessionId(otherParentSessionId);
+ txnRecord3.setTxnNum(otherParentTxnNumber);
+ txnRecord3.setLastWriteOpTime(opTime3);
+ txnRecord3.setLastWriteDate(Date_t::now());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord3.toBSON());
+
+ {
+ // Create another SessionCatalogMigrationSource. It should still return only the oplog entry
+ // for the internal session with the latest txnNumber.
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ if (parentSessionId.toBSON().woCompare(otherParentSessionId.toBSON()) > 0) {
+ ASSERT_BSONOBJ_EQ(entry3.getEntry().toBSON(),
+ nextOplogResult.oplog->getEntry().toBSON());
+ } else {
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op2.toBSON());
+ }
+
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ nextOplogResult = migrationSource.getLastFetchedOplog();
+ if (parentSessionId.toBSON().woCompare(otherParentSessionId.toBSON()) > 0) {
+ ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId);
+ ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber);
+ ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(),
+ op2.toBSON());
+ } else {
+ ASSERT_BSONOBJ_EQ(entry3.getEntry().toBSON(),
+ nextOplogResult.oplog->getEntry().toBSON());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ }
+}
+
TEST_F(
SessionCatalogMigrationSourceTest,
DeriveOplogEntriesForCommittedUnpreparedInternalTransactionForRetryableWriteForgePrePostImage) {
@@ -2076,7 +2203,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
// Logical session ids are generated randomly and the migration source queries in order of
// logical session id, so we need to change the order of the checks depending on the ordering of
// the sessionIds between the retryable write record and the transaction record.
- if (retryableWriteRecord.getSessionId().toBSON().woCompare(txnRecord.getSessionId().toBSON()) <
+ if (retryableWriteRecord.getSessionId().toBSON().woCompare(txnRecord.getSessionId().toBSON()) >
0) {
checkTxnEntry();
checkRetryableWriteEntry();
@@ -2331,7 +2458,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time,
- higherSessionId,
+ lowerSessionId,
txnNumber,
{0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
@@ -2342,13 +2469,13 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << -50), // o
boost::none, // o2
Date_t::now(), // wall clock time,
- higherSessionId,
+ lowerSessionId,
txnNumber,
{1}, // statement ids
entry1a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord1;
- sessionRecord1.setSessionId(higherSessionId);
+ sessionRecord1.setSessionId(lowerSessionId);
sessionRecord1.setTxnNum(txnNumber);
sessionRecord1.setLastWriteOpTime(entry1b.getOpTime());
sessionRecord1.setLastWriteDate(entry1b.getWallClockTime());
@@ -2363,7 +2490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- lowerSessionId,
+ higherSessionId,
txnNumber,
{3}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
@@ -2374,13 +2501,13 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time,
- lowerSessionId,
+ higherSessionId,
txnNumber,
{4}, // statement ids
entry2a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord2;
- sessionRecord2.setSessionId(lowerSessionId);
+ sessionRecord2.setSessionId(higherSessionId);
sessionRecord2.setTxnNum(txnNumber);
sessionRecord2.setLastWriteOpTime(entry2b.getOpTime());
sessionRecord2.setLastWriteDate(entry2b.getWallClockTime());