diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-04-06 22:49:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-07 00:05:29 +0000 |
commit | f492d8ef072ce011b31f8fbba10e013c001c5a5a (patch) | |
tree | 5e8e5d1c9d56a218ef13c04092b5c408e40bc237 /src/mongo/db/s/session_catalog_migration_source_test.cpp | |
parent | ecbaddb63705f5ac10f299a539a584d24fcfa20c (diff) | |
download | mongo-f492d8ef072ce011b31f8fbba10e013c001c5a5a.tar.gz |
SERVER-65293 Chunk migration should only migrate internal sessions for retryable writes with the highest txnNumber
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source_test.cpp')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 143 |
1 files changed, 135 insertions, 8 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 4ead7b2f031..21ec908a1af 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -490,7 +490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { } }; - if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) < + if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) > 0) { checkNextBatch(entry2b, entry2a); @@ -1717,6 +1717,133 @@ TEST_F(SessionCatalogMigrationSourceTest, runTest(true /*isPrepared */); } +TEST_F(SessionCatalogMigrationSourceTest, + DeriveOplogEntriesForCommittedInternalTransactionForRetryableWriteWithLatestTxnNumber) { + DBDirectClient client(opCtx()); + + const auto parentSessionId = makeLogicalSessionIdForTest(); + auto parentTxnNumber = TxnNumber{1}; + + const auto childSessionId1 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentSessionId, parentTxnNumber); + const auto childTxnNumber1 = TxnNumber{1}; + + auto op1 = makeDurableReplOp( + repl::OpTypeEnum::kUpdate, kNs, BSON("$set" << BSON("_id" << 1)), BSON("x" << 1), {1}); + auto opTime1 = repl::OpTime(Timestamp(210, 1), 1); + auto entry1 = makeApplyOpsOplogEntry(opTime1, + {}, // prevOpTime + {op1}, + childSessionId1, + childTxnNumber1, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry1); + + SessionTxnRecord txnRecord1; + txnRecord1.setSessionId(childSessionId1); + txnRecord1.setTxnNum(childTxnNumber1); + txnRecord1.setLastWriteOpTime(opTime1); + txnRecord1.setLastWriteDate(Date_t::now()); + txnRecord1.setState(DurableTxnStateEnum::kCommitted); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord1.toBSON()); + + ++parentTxnNumber; + const auto childSessionId2 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentSessionId, parentTxnNumber); + const auto childTxnNumber2 = TxnNumber{1}; + + auto op2 = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 2), BSONObj(), {1}); + auto opTime2 = repl::OpTime(Timestamp(210, 2), 1); + auto entry2 = makeApplyOpsOplogEntry(opTime2, + {}, // prevOpTime + {op2}, + childSessionId2, + childTxnNumber2, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry2); + + SessionTxnRecord txnRecord2; + txnRecord2.setSessionId(childSessionId2); + txnRecord2.setTxnNum(childTxnNumber2); + txnRecord2.setLastWriteOpTime(opTime2); + txnRecord2.setLastWriteDate(Date_t::now()); + txnRecord2.setState(DurableTxnStateEnum::kCommitted); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord2.toBSON()); + + { + // Create a SessionCatalogMigrationSource. It should return only the oplog entry for the + // internal session with the latest txnNumber. + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), op2.toBSON()); + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + } + + const auto otherParentSessionId = makeLogicalSessionIdForTest(); + const auto otherParentTxnNumber = TxnNumber{1}; + + auto opTime3 = repl::OpTime(Timestamp(210, 3), 1); + auto entry3 = makeOplogEntry(opTime3, + repl::OpTypeEnum::kInsert, + BSON("x" << 3), // o + boost::none, // o2 + Date_t::now(), // wall clock time, + otherParentSessionId, + otherParentTxnNumber, + {1}, // statement ids + {}); // prevOpTime + insertOplogEntry(entry3); + + SessionTxnRecord txnRecord3; + txnRecord3.setSessionId(otherParentSessionId); + txnRecord3.setTxnNum(otherParentTxnNumber); + txnRecord3.setLastWriteOpTime(opTime3); + txnRecord3.setLastWriteDate(Date_t::now()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord3.toBSON()); + + { + // Create another SessionCatalogMigrationSource. It should still return only the oplog entry + // for the internal session with the latest txnNumber. + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + if (parentSessionId.toBSON().woCompare(otherParentSessionId.toBSON()) > 0) { + ASSERT_BSONOBJ_EQ(entry3.getEntry().toBSON(), + nextOplogResult.oplog->getEntry().toBSON()); + } else { + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op2.toBSON()); + } + + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + nextOplogResult = migrationSource.getLastFetchedOplog(); + if (parentSessionId.toBSON().woCompare(otherParentSessionId.toBSON()) > 0) { + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), parentSessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), parentTxnNumber); + ASSERT_BSONOBJ_EQ(nextOplogResult.oplog->getDurableReplOperation().toBSON(), + op2.toBSON()); + } else { + ASSERT_BSONOBJ_EQ(entry3.getEntry().toBSON(), + nextOplogResult.oplog->getEntry().toBSON()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + } +} + TEST_F( SessionCatalogMigrationSourceTest, DeriveOplogEntriesForCommittedUnpreparedInternalTransactionForRetryableWriteForgePrePostImage) { @@ -2076,7 +2203,7 @@ TEST_F(SessionCatalogMigrationSourceTest, // Logical session ids are generated randomly and the migration source queries in order of // logical session id, so we need to change the order of the checks depending on the ordering of // the sessionIds between the retryable write record and the transaction record. - if (retryableWriteRecord.getSessionId().toBSON().woCompare(txnRecord.getSessionId().toBSON()) < + if (retryableWriteRecord.getSessionId().toBSON().woCompare(txnRecord.getSessionId().toBSON()) > 0) { checkTxnEntry(); checkRetryableWriteEntry(); @@ -2331,7 +2458,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite BSON("x" << 30), // o boost::none, // o2 Date_t::now(), // wall clock time, - higherSessionId, + lowerSessionId, txnNumber, {0}, // statement ids repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction @@ -2342,13 +2469,13 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite BSON("x" << -50), // o boost::none, // o2 Date_t::now(), // wall clock time, - higherSessionId, + lowerSessionId, txnNumber, {1}, // statement ids entry1a.getOpTime()); // optime of previous write within same transaction SessionTxnRecord sessionRecord1; - sessionRecord1.setSessionId(higherSessionId); + sessionRecord1.setSessionId(lowerSessionId); sessionRecord1.setTxnNum(txnNumber); sessionRecord1.setLastWriteOpTime(entry1b.getOpTime()); sessionRecord1.setLastWriteDate(entry1b.getWallClockTime()); @@ -2363,7 +2490,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite BSON("x" << 30), // o boost::none, // o2 Date_t::now(), // wall clock time - lowerSessionId, + higherSessionId, txnNumber, {3}, // statement ids repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction @@ -2374,13 +2501,13 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite BSON("x" << 50), // o boost::none, // o2 Date_t::now(), // wall clock time, - lowerSessionId, + higherSessionId, txnNumber, {4}, // statement ids entry2a.getOpTime()); // optime of previous write within same transaction SessionTxnRecord sessionRecord2; - sessionRecord2.setSessionId(lowerSessionId); + sessionRecord2.setSessionId(higherSessionId); sessionRecord2.setTxnNum(txnNumber); sessionRecord2.setLastWriteOpTime(entry2b.getOpTime()); sessionRecord2.setLastWriteDate(entry2b.getWallClockTime()); |