diff options
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 337 |
2 files changed, 343 insertions, 19 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 75b22f61342..36785825a04 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -201,12 +201,27 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o // tests. DBDirectClient client(opCtx); FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; - findRequest.setSort(BSON("_id" << 1)); + // Skip retryable internal transactions that are either aborted or still in progress so there is + // no write history to transfer at this point. + // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions for + // non-retryable writes. + findRequest.setFilter(BSON( + "$or" << BSON_ARRAY(BSON((SessionTxnRecord::kSessionIdFieldName + "." + + InternalSessionFields::kTxnUUIDFieldName) + << BSON("$exists" << false)) + << BSON("$and" << BSON_ARRAY(BSON( + (SessionTxnRecord::kSessionIdFieldName + "." + + InternalSessionFields::kTxnNumberFieldName) + << BSON("$exists" << true) + << SessionTxnRecord::kStateFieldName << "committed")))))); + findRequest.setSort(BSON(SessionTxnRecord::kSessionIdFieldName << 1)); auto cursor = client.find(std::move(findRequest)); while (cursor->more()) { auto nextSession = SessionTxnRecord::parse( IDLParserErrorContext("Session migration cloning"), cursor->next()); + const auto sessionId = nextSession.getSessionId(); + if (!nextSession.getLastWriteOpTime().isNull()) { _sessionOplogIterators.push_back( std::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit)); @@ -486,6 +501,14 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op // If this oplog entry corresponds to transaction prepare/commit, replace it with a sentinel // entry. if (entryAtOpTimeType == EntryAtOpTimeType::kTransaction) { + const auto sessionId = *newWriteOplogEntry.getSessionId(); + + if (isInternalSessionForNonRetryableWrite(sessionId)) { + // TODO (SERVER-64331): Determine if chunk migration should migrate internal sessions + // for non-retryable writes. + return false; + } + newWriteOplogEntry = makeSentinelOplogEntry(*newWriteOplogEntry.getSessionId(), *newWriteOplogEntry.getTxnNumber(), diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 0aa5aad5d0f..f2fe2b99e4a 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -181,6 +181,83 @@ repl::OplogEntry makeRewrittenOplogInSession(repl::OpTime opTime, .get()); // optime of previous write within same transaction }; +repl::DurableReplOperation makeDurableReplOp( + const mongo::repl::OpTypeEnum opType, + const NamespaceString& nss, + const BSONObj& object, + boost::optional<BSONObj> object2, + const std::vector<int> stmtIds, + boost::optional<repl::RetryImageEnum> needsRetryImage = boost::none, + boost::optional<repl::OpTime> preImageOpTime = boost::none, + boost::optional<repl::OpTime> postImageOpTime = boost::none) { + auto op = repl::DurableReplOperation(opType, nss, object); + op.setObject2(object2); + if (stmtIds.size() == 1) { + // This is required for making BSON equality check in the tests below work. + op.setStatementIds({{stmtIds.front()}}); + } else if (!stmtIds.empty()) { + op.setStatementIds({{stmtIds}}); + } + op.setNeedsRetryImage(needsRetryImage); + op.setPreImageOpTime(preImageOpTime); + op.setPostImageOpTime(postImageOpTime); + return op; +} + +repl::OplogEntry makeApplyOpsOplogEntry(repl::OpTime opTime, + repl::OpTime prevWriteOpTimeInTransaction, + std::vector<repl::DurableReplOperation> ops, + LogicalSessionId sessionId, + TxnNumber txnNumber, + bool isPrepare, + bool isPartial) { + BSONObjBuilder applyOpsBuilder; + + BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); + for (const auto& op : ops) { + opsArrayBuilder.append(op.toBSON()); + } + opsArrayBuilder.done(); + + if (isPrepare) { + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true); + } + + if (isPartial) { + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true); + } + + repl::MutableOplogEntry op; + op.setOpType(repl::OpTypeEnum::kCommand); + op.setObject(applyOpsBuilder.obj()); + op.setSessionId(std::move(sessionId)); + op.setTxnNumber(std::move(txnNumber)); + op.setOpTime(opTime); + op.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction); + op.setWallClockTime(Date_t::now()); + op.setNss({}); + + return {op.toBSON()}; +} + +repl::OplogEntry makeCommandOplogEntry(repl::OpTime opTime, + repl::OpTime prevWriteOpTimeInTransaction, + BSONObj commandObj, + boost::optional<LogicalSessionId> sessionId, + boost::optional<TxnNumber> txnNumber) { + repl::MutableOplogEntry op; + op.setOpType(repl::OpTypeEnum::kCommand); + op.setObject(std::move(commandObj)); + op.setSessionId(std::move(sessionId)); + op.setTxnNumber(std::move(txnNumber)); + op.setOpTime(opTime); + op.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction); + op.setWallClockTime({}); + op.setNss({}); + + return {op.toBSON()}; +}; + TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) { SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); @@ -821,6 +898,68 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) { ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException); } +TEST_F(SessionCatalogMigrationSourceTest, + ReturnDeadEndSentinelOplogEntryForNewCommittedNonInternalTransaction) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + const auto sessionId = makeLogicalSessionIdForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(110, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry); + + migrationSource.notifyNewWriteOpTime( + entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_TRUE(nextOplogResult.shouldWaitForMajority); + ASSERT_EQ(*nextOplogResult.oplog->getSessionId(), sessionId); + ASSERT_EQ(*nextOplogResult.oplog->getTxnNumber(), txnNumber); + ASSERT_BSONOBJ_EQ(*nextOplogResult.oplog->getObject2(), + TransactionParticipant::kDeadEndSentinel); + auto stmtIds = nextOplogResult.oplog->getStatementIds(); + ASSERT_EQ(stmtIds.size(), 1U); + ASSERT_EQ(stmtIds[0], kIncompleteHistoryStmtId); + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + +TEST_F(SessionCatalogMigrationSourceTest, + IgnoreNewCommittedForInternalTransactionForNonRetryableWrite) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(120, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry); + + migrationSource.notifyNewWriteOpTime( + entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); +} + TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) { const auto sessionId = makeLogicalSessionIdForTest(); const auto txnNumber = TxnNumber{1}; @@ -1011,11 +1150,25 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { } TEST_F(SessionCatalogMigrationSourceTest, - CommitTransactionEntriesShouldBeConvertedToDeadEndSentinel) { + ReturnDeadEndSentinelOplogEntryForCommittedNonInternalTransaction) { + const auto sessionId = makeLogicalSessionIdForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(160, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry); + SessionTxnRecord txnRecord; - txnRecord.setSessionId(makeLogicalSessionIdForTest()); - txnRecord.setTxnNum(20); - txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5)); + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime())); txnRecord.setLastWriteDate(Date_t::now()); txnRecord.setState(DurableTxnStateEnum::kCommitted); @@ -1036,12 +1189,57 @@ TEST_F(SessionCatalogMigrationSourceTest, ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } +TEST_F(SessionCatalogMigrationSourceTest, IgnoreCommittedInternalTransactionForNonRetryableWrite) { + const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(170, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + false, // isPrepare + false); // isPartial + insertOplogEntry(entry); + + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime())); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kCommitted); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + TEST_F(SessionCatalogMigrationSourceTest, - PrepareTransactionEntriesShouldBeConvertedToDeadEndSentinel) { + ReturnDeadEndSentinelOplogEntryForPreparedNonInternalTransaction) { + const auto sessionId = makeLogicalSessionIdForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(180, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + true, // isPrepare + false); // isPartial + insertOplogEntry(entry); + SessionTxnRecord txnRecord; - txnRecord.setSessionId(makeLogicalSessionIdForTest()); - txnRecord.setTxnNum(20); - txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5)); + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime())); txnRecord.setLastWriteDate(Date_t::now()); txnRecord.setState(DurableTxnStateEnum::kPrepared); @@ -1062,13 +1260,27 @@ TEST_F(SessionCatalogMigrationSourceTest, ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); } -TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIgnored) { +TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForNonRetryableWrite) { + const auto sessionId = makeLogicalSessionIdWithTxnUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {}); + auto applyOpsOpTime = repl::OpTime(Timestamp(190, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + true, // isPrepare + false); // isPartial + insertOplogEntry(entry); + SessionTxnRecord txnRecord; - txnRecord.setSessionId(makeLogicalSessionIdForTest()); - txnRecord.setTxnNum(20); - txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5)); + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime())); txnRecord.setLastWriteDate(Date_t::now()); - txnRecord.setState(DurableTxnStateEnum::kInProgress); + txnRecord.setState(DurableTxnStateEnum::kPrepared); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); @@ -1079,13 +1291,27 @@ TEST_F(SessionCatalogMigrationSourceTest, InProgressTransactionEntriesShouldBeIg ASSERT_FALSE(migrationSource.hasMoreOplog()); } -TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnored) { +TEST_F(SessionCatalogMigrationSourceTest, IgnorePreparedInternalTransactionForRetryableWrite) { + const auto sessionId = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const auto txnNumber = TxnNumber{1}; + + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, kNs, BSON("x" << 1), BSONObj(), {1}); + auto applyOpsOpTime = repl::OpTime(Timestamp(200, 1), 1); + auto entry = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + true, // isPrepare + false); // isPartial + insertOplogEntry(entry); + SessionTxnRecord txnRecord; - txnRecord.setSessionId(makeLogicalSessionIdForTest()); - txnRecord.setTxnNum(20); - txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5)); + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(entry.getOpTime())); txnRecord.setLastWriteDate(Date_t::now()); - txnRecord.setState(DurableTxnStateEnum::kAborted); + txnRecord.setState(DurableTxnStateEnum::kPrepared); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); @@ -1096,6 +1322,81 @@ TEST_F(SessionCatalogMigrationSourceTest, AbortedTransactionEntriesShouldBeIgnor ASSERT_FALSE(migrationSource.hasMoreOplog()); } +TEST_F(SessionCatalogMigrationSourceTest, IgnoreInProgressTransaction) { + auto runTest = [&](const LogicalSessionId& sessionId) { + const auto txnNumber = TxnNumber{1}; + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(repl::OpTime(Timestamp(12, 34), 5)); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kInProgress); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); +} + +TEST_F(SessionCatalogMigrationSourceTest, IgnoreAbortedTransaction) { + auto opTimeSecs = 200; + + auto runTest = [&](const LogicalSessionId& sessionId) { + const auto txnNumber = TxnNumber{1}; + + auto applyOpsOpTime = repl::OpTime(Timestamp(opTimeSecs, 1), 1); + auto op = makeDurableReplOp(repl::OpTypeEnum::kInsert, + kNs, + BSON("x" << 1), + BSONObj(), + isInternalSessionForRetryableWrite(sessionId) + ? std::vector<StmtId>{1} + : std::vector<StmtId>{}); + auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime, + {}, // prevOpTime + {op}, + sessionId, + txnNumber, + true, // isPrepare + false); // isPartial + insertOplogEntry(entry1); + + auto abortOpTime = repl::OpTime(Timestamp(opTimeSecs, 2), 1); + auto entry2 = makeCommandOplogEntry( + abortOpTime, applyOpsOpTime, BSON("abortTransaction" << 1), sessionId, txnNumber); + insertOplogEntry(entry2); + + SessionTxnRecord txnRecord; + txnRecord.setSessionId(sessionId); + txnRecord.setTxnNum(txnNumber); + txnRecord.setLastWriteOpTime(abortOpTime); + txnRecord.setLastWriteDate(Date_t::now()); + txnRecord.setState(DurableTxnStateEnum::kAborted); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), txnRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); + + opTimeSecs++; + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); +} + TEST_F(SessionCatalogMigrationSourceTest, MixedTransactionEntriesAndRetryableWritesEntriesReturnCorrectResults) { const auto sessionId1 = makeLogicalSessionIdForTest(); |