diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-03-04 17:10:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-04 18:18:25 +0000 |
commit | 97d47a47ed9629ae4206a7b8e20be61aef8d17ec (patch) | |
tree | 41ec45dc8c6e7ea6e07602fdf621be227ee317c4 /src/mongo/db/s/resharding | |
parent | a14ca1e9d6a71f76621f974ecca663ae5f0fd246 (diff) | |
download | mongo-97d47a47ed9629ae4206a7b8e20be61aef8d17ec.tar.gz |
SERVER-63441 Handle retryable internal transactions with multiple oplog entries when migrating sessions during resharding
Diffstat (limited to 'src/mongo/db/s/resharding')
9 files changed, 534 insertions, 148 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index e000d42f5ae..8be9fbb4669 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -307,13 +307,16 @@ void updateSessionRecord(OperationContext* opCtx, auto txnParticipant = TransactionParticipant::get(opCtx); invariant(txnParticipant, "Must be called with session checked out"); + const auto sessionId = *opCtx->getLogicalSessionId(); + const auto txnNumber = *opCtx->getTxnNumber(); + repl::MutableOplogEntry oplogEntry; oplogEntry.setOpType(repl::OpTypeEnum::kNoop); oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag); oplogEntry.setObject2(std::move(o2Field)); oplogEntry.setNss({}); - oplogEntry.setSessionId(opCtx->getLogicalSessionId()); - oplogEntry.setTxnNumber(opCtx->getTxnNumber()); + oplogEntry.setSessionId(sessionId); + oplogEntry.setTxnNumber(txnNumber); oplogEntry.setStatementIds(stmtIds); oplogEntry.setPreImageOpTime(std::move(preImageOpTime)); oplogEntry.setPostImageOpTime(std::move(postImageOpTime)); @@ -321,37 +324,33 @@ void updateSessionRecord(OperationContext* opCtx, oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); oplogEntry.setFromMigrate(true); - writeConflictRetry(opCtx, - "resharding::data_copy::updateSessionRecord", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - - WriteUnitOfWork wuow(opCtx); - repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry); - - uassert(4989901, - str::stream() << "Failed to create new oplog entry: " - << redact(oplogEntry.toBSON()), - !opTime.isNull()); - - // Use the same wallTime as the oplog entry since SessionUpdateTracker - // looks at the oplog entry wallTime when replicating. - SessionTxnRecord sessionTxnRecord(*oplogEntry.getSessionId(), - *oplogEntry.getTxnNumber(), - std::move(opTime), - oplogEntry.getWallClockTime()); - - if (isInternalSessionForRetryableWrite(*oplogEntry.getSessionId())) { - sessionTxnRecord.setParentSessionId( - *getParentSessionId(*oplogEntry.getSessionId())); - } - - txnParticipant.onRetryableWriteCloningCompleted( - opCtx, stmtIds, sessionTxnRecord); - - wuow.commit(); - }); + writeConflictRetry( + opCtx, + "resharding::data_copy::updateSessionRecord", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + + WriteUnitOfWork wuow(opCtx); + repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry); + + uassert(4989901, + str::stream() << "Failed to create new oplog entry: " + << redact(oplogEntry.toBSON()), + !opTime.isNull()); + + // Use the same wallTime as the oplog entry since SessionUpdateTracker + // looks at the oplog entry wallTime when replicating. + SessionTxnRecord sessionTxnRecord( + sessionId, txnNumber, std::move(opTime), oplogEntry.getWallClockTime()); + if (isInternalSessionForRetryableWrite(sessionId)) { + sessionTxnRecord.setParentSessionId(*getParentSessionId(sessionId)); + } + + txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord); + + wuow.commit(); + }); } } // namespace mongo::resharding::data_copy diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index b65d5376689..452cef3ea9a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -78,8 +78,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch( CancellationToken cancelToken, CancelableOperationContextFactory factory) { Timer latencyTimer; - auto crudWriterVectors = - _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps); + auto crudWriterVectors = _batchPreparer.makeCrudOpWriterVectors( + _currentBatchToApply, _currentDerivedOpsForCrudWriters); CancellationSource errorSource(cancelToken); @@ -99,7 +99,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch( } } - auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply); + auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors( + _currentBatchToApply, _currentDerivedOpsForSessionWriters); batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size()); for (auto&& writer : sessionWriterVectors) { @@ -152,7 +153,8 @@ SemiFuture<void> ReshardingOplogApplier::run( "reshardingApplyOplogBatchTwice failpoint enabled, applying batch " "a second time", "batchSize"_attr = _currentBatchToApply.size()); - _currentDerivedOps.clear(); + _currentDerivedOpsForCrudWriters.clear(); + _currentDerivedOpsForSessionWriters.clear(); return _applyBatch(executor, cancelToken, factory); } return SemiFuture<void>(); @@ -240,7 +242,8 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext* } _currentBatchToApply.clear(); - _currentDerivedOps.clear(); + _currentDerivedOpsForCrudWriters.clear(); + _currentDerivedOpsForSessionWriters.clear(); } NamespaceString ReshardingOplogApplier::ensureStashCollectionExists( diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index 0c3a8bc8306..f7835c2ee64 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -146,7 +146,8 @@ private: OplogBatch _currentBatchToApply; // Buffer for internally generated oplog entries that needs to be processed for this batch. - std::list<repl::OplogEntry> _currentDerivedOps; + std::list<repl::OplogEntry> _currentDerivedOpsForCrudWriters; + std::list<repl::OplogEntry> _currentDerivedOpsForSessionWriters; // The source of the oplog entries to be applied. std::unique_ptr<ReshardingDonorOplogIteratorInterface> _oplogIter; diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp index a6f73b01cec..bde8901926a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp @@ -50,8 +50,7 @@ namespace { * Return true if we need to update config.transactions collection for this oplog entry. */ bool shouldUpdateTxnTable(const repl::OplogEntry& op) { - if (op.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction || - op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { + if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { return true; } @@ -64,8 +63,21 @@ bool shouldUpdateTxnTable(const repl::OplogEntry& op) { } if (op.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { - auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - return !applyOpsInfo.getPrepare() && !applyOpsInfo.getPartialTxn(); + // This applyOps oplog entry is guaranteed to correspond to a committed transaction since + // the resharding aggregation pipeline does not output applyOps oplog entries for aborted + // transactions (i.e. it only outputs the abortTransaction oplog entry). + + if (isInternalSessionForRetryableWrite(*op.getSessionId())) { + // For a retryable internal transaction, we need to update the config.transactions + // collection upon writing the noop oplog entries for retryable operations contained + // within each applyOps oplog entry. + return true; + } + + // The resharding aggregation pipeline also does not output the commitTransaction oplog + // entry so for a non-retryable transaction, we need to the update to the + // config.transactions collection upon seeing the final applyOps oplog entry. + return !op.isPartialTransaction(); } return false; @@ -116,6 +128,8 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); + // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with + // WouldChangeOwningShard sentinel noop entry. uassert( ErrorCodes::OplogOperationUnsupported, str::stream() << "Commands within applyOps are not supported during resharding: " @@ -148,7 +162,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( - const OplogBatchToPrepare& batch) const { + const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const { auto writerVectors = _makeEmptyWriterVectors(); struct SessionOpsList { @@ -188,7 +202,52 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( } else if (op.isCommand()) { throwIfUnsupportedCommandOp(op); - if (shouldUpdateTxnTable(op)) { + if (!shouldUpdateTxnTable(op)) { + continue; + } + + auto sessionId = *op.getSessionId(); + + if (isInternalSessionForRetryableWrite(sessionId) && + op.getCommandType() == OplogEntry::CommandType::kApplyOps) { + // Derive retryable write CRUD oplog entries from this retryable internal + // transaction applyOps oplog entry. + + auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); + // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with + // WouldChangeOwningShard sentinel noop entry. + uassert(ErrorCodes::OplogOperationUnsupported, + str::stream() + << "Commands within applyOps are not supported during resharding: " + << redact(op.toBSONForLogging()), + applyOpsInfo.areOpsCrudOnly()); + + auto unrolledOp = + uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON())); + unrolledOp.setSessionId(*getParentSessionId(sessionId)); + unrolledOp.setTxnNumber(*sessionId.getTxnNumber()); + + for (const auto& innerOp : applyOpsInfo.getOperations()) { + auto replOp = repl::ReplOperation::parse( + {"ReshardingOplogBatchPreparer::makeSessionOpWriterVectors innerOp"}, + innerOp); + if (replOp.getStatementIds().empty()) { + // Skip this operation since it is not retryable. + continue; + } + unrolledOp.setDurableReplOperation(replOp); + + // There isn't a direct way to convert from a MutableOplogEntry to a + // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get + // re-parsed into an OplogEntry. + auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON()); + invariant(derivedOp.isCrudOpType()); + + // `&derivedOp` is guaranteed to remain stable while we append more derived + // oplog entries because `derivedOps` is a std::list. + updateSessionTracker(&derivedOp); + } + } else { updateSessionTracker(&op); } } else { diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h index f4f035ccdbd..27e7934b326 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h @@ -82,11 +82,13 @@ public: * to the config.transactions record for a higher txnNumber will cause any updates in `batch` * for lower txnNumbers to be elided. * - * The returned writer vectors refer to memory owned by `batch`. The caller must take care to - * ensure `batch` outlives the writer vectors all being applied and must take care not to modify - * `batch` until after the writer vectors have all been applied. + * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller + * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being + * applied and must take care not to modify `batch` or `derivedOps` until after the writer + * vectors have all been applied. */ - WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const; + WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch, + std::list<OplogEntry>& derivedOps) const; static void throwIfUnsupportedCommandOp(const OplogEntry& op); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp index 07ae756eeae..c98104fcf3a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp @@ -31,6 +31,7 @@ #include <boost/optional/optional_io.hpp> +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -66,25 +67,42 @@ protected: return {op.toBSON()}; } - repl::OplogEntry makeApplyOps(BSONObj document, - bool isPrepare, - bool isPartial, - boost::optional<LogicalSessionId> lsid, - boost::optional<TxnNumber> txnNumber) { - - std::vector<mongo::BSONObj> operations; - auto insertOp = repl::MutableOplogEntry::makeInsertOperation( - NamespaceString("foo.bar"), UUID::gen(), document, document); - + /** + * Returns an applyOps oplog entry containing insert operations for the given documents. If the + * session is an internal session for retryable writes, uses the "_id" of each document as its + * statement id. + */ + repl::OplogEntry makeApplyOpsForInsert(const std::vector<BSONObj> documents, + boost::optional<LogicalSessionId> lsid = boost::none, + boost::optional<TxnNumber> txnNumber = boost::none, + boost::optional<bool> isPrepare = boost::none, + boost::optional<bool> isPartial = boost::none) { BSONObjBuilder applyOpsBuilder; - applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON())); + + BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); + for (const auto& document : documents) { + auto insertOp = repl::DurableReplOperation(repl::OpTypeEnum::kInsert, {}, document); + if (lsid && isInternalSessionForRetryableWrite(*lsid)) { + if (!document.hasField("_id")) { + continue; + } + auto id = document.getIntField("_id"); + insertOp.setStatementIds({{id}}); + } + opsArrayBuilder.append(insertOp.toBSON()); + } + opsArrayBuilder.done(); if (isPrepare) { - applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true); + invariant(lsid); + invariant(txnNumber); + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, *isPrepare); } if (isPartial) { - applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true); + invariant(lsid); + invariant(txnNumber); + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, *isPartial); } repl::MutableOplogEntry op; @@ -194,24 +212,13 @@ TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) { // We use the "fromApplyOps" field in the document to distinguish between the regular oplog // entries from the derived ones later on. int numOps = 20; + std::vector<BSONObj> docsForApplyOps; for (int i = 0; i < numOps; ++i) { batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false))); + docsForApplyOps.push_back(BSON("_id" << i << "fromApplyOps" << true)); } - BSONObjBuilder applyOpsBuilder; - { - BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); - for (int i = 0; i < numOps; ++i) { - // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to - // deal with setting the 'o2' field. - opsArrayBuilder.append( - repl::DurableReplOperation( - repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true)) - .toBSON()); - } - } - - batch.emplace_back(makeCommandOp(applyOpsBuilder.done())); + batch.emplace_back(makeApplyOpsForInsert(docsForApplyOps)); std::list<repl::OplogEntry> derivedOps; auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); @@ -254,12 +261,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) { } else { // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to // deal with setting the 'o2' field. - batch.emplace_back(makeCommandOp(BSON( - "applyOps" << BSON_ARRAY(repl::DurableReplOperation( - repl::OpTypeEnum::kInsert, - {}, - BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)) - .toBSON())))); + batch.emplace_back( + makeApplyOpsForInsert({BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)})); } } @@ -290,8 +293,10 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid) batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1})); } - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOps.size(), 0U); auto writer = getNonEmptyWriterVector(writerVectors); ASSERT_EQ(writer.size(), numOps); @@ -311,8 +316,10 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) { batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i})); } - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOps.size(), 0U); auto writer = getNonEmptyWriterVector(writerVectors); ASSERT_EQ(writer.size(), 1U); @@ -330,7 +337,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFai makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1})); } - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid @@ -357,107 +365,319 @@ TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) { batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1))); std::list<repl::OplogEntry> derivedOps; - ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch), + ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch, derivedOps), DBException, ErrorCodes::OplogOperationUnsupported); } } TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) { - OplogBatch batch; - - int numOps = 5; - for (int i = 0; i < numOps; ++i) { - repl::MutableOplogEntry op; - op.setOpType(repl::OpTypeEnum::kNoop); - op.setObject({}); - op.setNss({}); - op.setOpTime({{}, {}}); - op.setWallClockTime({}); - batch.emplace_back(op.toBSON()); - } + auto runTest = [&](const boost::optional<LogicalSessionId>& lsid, + const boost::optional<TxnNumber>& txnNumber) { + OplogBatch batch; - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); + int numOps = 5; + for (int i = 0; i < numOps; ++i) { + repl::MutableOplogEntry op; + op.setSessionId(lsid); + op.setTxnNumber(txnNumber); + op.setOpType(repl::OpTypeEnum::kNoop); + op.setObject({}); + op.setNss({}); + op.setOpTime({{}, {}}); + op.setWallClockTime({}); + batch.emplace_back(op.toBSON()); + } - writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); + std::list<repl::OplogEntry> derivedOpsForCrudWriters; + auto writerVectors = + _batchPreparer.makeCrudOpWriterVectors(batch, derivedOpsForCrudWriters); + ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOpsForCrudWriters.size(), 0U); + ASSERT_EQ(writerVectors[0].size(), 0U); + ASSERT_EQ(writerVectors[1].size(), 0U); + + std::list<repl::OplogEntry> derivedOpsForSessionWriters; + writerVectors = + _batchPreparer.makeSessionOpWriterVectors(batch, derivedOpsForSessionWriters); + ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOpsForSessionWriters.size(), 0U); + ASSERT_EQ(writerVectors[0].size(), 0U); + ASSERT_EQ(writerVectors[1].size(), 0U); + }; + + runTest(boost::none, boost::none); + + TxnNumber txnNumber{1}; + runTest(makeLogicalSessionIdForTest(), txnNumber); + runTest(makeLogicalSessionIdWithTxnUUIDForTest(), txnNumber); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), txnNumber); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForApplyOpsWithoutTxnNumber) { OplogBatch batch; - batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, boost::none, boost::none)); + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0)})); - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_EQ(derivedOps.size(), 0U); for (const auto& writer : writerVectors) { ASSERT_TRUE(writer.empty()); } } -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); - batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, lsid, 2)); +TEST_F(ReshardingOplogBatchPreparerTest, + SessionWriteVectorsDeriveCrudOpsForApplyOpsForRetryableInternalTransaction) { + const auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + const TxnNumber txnNumber{1}; + + OplogBatch batch; + // 'makeApplyOpsForInsert' uses the "_id" of each document as the "stmtId" for its insert + // operation. The insert operation without a stmtId should not have a derived operation. + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSONObj(), BSON("_id" << 1)}, + lsid, + txnNumber, + false /* isPrepare */, + false /* isPartial */)); - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); ASSERT_FALSE(writerVectors.empty()); auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), 2); + + ASSERT_EQ(writer.size(), 2U); + ASSERT_EQ(derivedOps.size(), 2U); + for (size_t i = 0; i < writer.size(); ++i) { + ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); + } } -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) { + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; - batch.emplace_back(makeApplyOps(BSON("_id" << 3), true, false, lsid, 2)); - batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, 2)); + OplogBatch batch; + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, + lsid, + txnNumber, + false /* isPrepare */, + false /* isPartial */)); - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_FALSE(writerVectors.empty()); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT_EQ(writer.size(), 2U); + ASSERT_EQ(derivedOps.size(), 2U); + for (size_t i = 0; i < writer.size(); ++i) { + ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); + } + } else { + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); + ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + } + }; - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), 2); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); } -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeUnpreparedTxn) { + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; - batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, 2)); + OplogBatch batch; + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, + lsid, + txnNumber, + false /* isPrepare */, + true /* isPartial */)); + batch.emplace_back(makeApplyOpsForInsert( + {BSON("_id" << 2)}, lsid, txnNumber, false /* isPrepare */, false /* isPartial */)); - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - ASSERT_FALSE(writerVectors.empty()); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT_EQ(writer.size(), 3U); + ASSERT_EQ(derivedOps.size(), 3U); + for (size_t i = 0; i < writer.size(); ++i) { + ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); + } + } else { + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); + ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + } + }; - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), 2); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); +} + +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallCommittedPreparedTxn) { + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; + + OplogBatch batch; + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, + lsid, + txnNumber, + true /* isPrepare */, + false /* isPartial */)); + batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber)); + + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT_EQ(writer.size(), 2U); + ASSERT_EQ(derivedOps.size(), 2U); + for (size_t i = 0; i < writer.size(); ++i) { + ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); + } + } else { + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); + ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + } + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); +} + +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeCommittedPreparedTxn) { + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; + + OplogBatch batch; + batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, + lsid, + txnNumber, + false /* isPrepare */, + true /* isPartial */)); + batch.emplace_back(makeApplyOpsForInsert( + {BSON("_id" << 2)}, lsid, txnNumber, true /* isPrepare */, false /* isPartial */)); + batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber)); + + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT_EQ(writer.size(), 3U); + ASSERT_EQ(derivedOps.size(), 3U); + for (size_t i = 0; i < writer.size(); ++i) { + ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); + } + } else { + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); + ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + } + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); +} + +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) { + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; + + OplogBatch batch; + batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, txnNumber)); + + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); + ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) { - OplogBatch batch; - auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + const TxnNumber txnNumber{1}; - batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, true, lsid, 2)); + OplogBatch batch; + batch.emplace_back(makeApplyOpsForInsert( + {BSON("_id" << 0)}, lsid, txnNumber, false /* isPrepare */, true /* isPartial */)); - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT_FALSE(writerVectors.empty()); + auto writer = getNonEmptyWriterVector(writerVectors); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(derivedOps.size(), 1U); + ASSERT_EQ(writer[0]->getSessionId(), *getParentSessionId(lsid)); + ASSERT_EQ(*writer[0]->getTxnNumber(), *lsid.getTxnNumber()); + ASSERT(writer[0]->getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(writer[0]->getObject(), (BSON("_id" << 0))); + } else { + ASSERT_EQ(derivedOps.size(), 0U); + for (const auto& writer : writerVectors) { + ASSERT_TRUE(writer.empty()); + } + } + }; - for (const auto& writer : writerVectors) { - ASSERT_TRUE(writer.empty()); - } + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); + runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); } } // namespace diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp index 5d6b635e9e4..123a740f119 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp @@ -78,8 +78,29 @@ repl::OpTime ReshardingOplogSessionApplication::_logPrePostImage( } boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryApplyOperation( - OperationContext* opCtx, const repl::OplogEntry& op) const { + OperationContext* opCtx, const mongo::repl::OplogEntry& op) const { + invariant(op.getSessionId()); + invariant(op.getTxnNumber()); + auto lsid = *op.getSessionId(); + if (isInternalSessionForNonRetryableWrite(lsid)) { + // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for + // non-retryable writes. + return boost::none; + } + if (isInternalSessionForRetryableWrite(lsid)) { + // The oplog preparer should have turned each applyOps oplog entry for a retryable internal + // transaction into retryable write CRUD oplog entries. + invariant(op.getCommandType() != repl::OplogEntry::CommandType::kApplyOps); + + if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { + // Skip this oplog entry since there is no retryable write history to apply and writing + // a sentinel noop oplog entry would make retryable write statements that successfully + // executed outside of this internal transaction not retryable. + return boost::none; + } + } + auto txnNumber = *op.getTxnNumber(); bool isRetryableWrite = op.isCrudOpType(); @@ -88,6 +109,7 @@ boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryAp auto stmtIds = isRetryableWrite ? op.getStatementIds() : std::vector<StmtId>{kIncompleteHistoryStmtId}; + invariant(!stmtIds.empty()); boost::optional<repl::OpTime> preImageOpTime; if (auto preImageOp = op.getPreImageOp()) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp index 6c57d2271a5..1116a41ec2a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp @@ -68,6 +68,8 @@ public: MongoDSessionCatalog::onStepUp(opCtx.get()); } + + serverGlobalParams.clusterRole = ClusterRole::ShardServer; } repl::OpTime insertSessionRecord(OperationContext* opCtx, @@ -947,5 +949,66 @@ TEST_F(ReshardingOplogSessionApplicationTest, IncomingTxnHasHigherTxnNumberThanP ASSERT_OK(hitPreparedTxn->getNoThrow()); } +TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingAbortedRetryableInternalTransaction) { + auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); + + TxnNumber incomingTxnNumber = 100; + + auto opTime = [&] { + auto opCtx = makeOperationContext(); + return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3}); + }(); + + // 'makeFinishTxnOp' returns an abortTransaction oplog entry. + auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); + + { + auto opCtx = makeOperationContext(); + ReshardingOplogSessionApplication applier; + auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry); + ASSERT_FALSE(bool(hitPreparedTxn)); + } + + { + auto opCtx = makeOperationContext(); + auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp()); + ASSERT_EQ(foundOps.size(), 0U); + + auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid); + ASSERT_FALSE(bool(sessionTxnRecord)); + } +} + +TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingNonRetryableInternalTransaction) { + // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for + // non-retryable writes. + auto lsid = makeLogicalSessionIdWithTxnUUIDForTest(); + + TxnNumber incomingTxnNumber = 100; + + auto opTime = [&] { + auto opCtx = makeOperationContext(); + return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3}); + }(); + + auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); + + { + auto opCtx = makeOperationContext(); + ReshardingOplogSessionApplication applier; + auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry); + ASSERT_FALSE(bool(hitPreparedTxn)); + } + + { + auto opCtx = makeOperationContext(); + auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp()); + ASSERT_EQ(foundOps.size(), 0U); + + auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid); + ASSERT_FALSE(bool(sessionTxnRecord)); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp index cfb088a7655..a6d9836a334 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp @@ -167,8 +167,25 @@ boost::optional<SessionTxnRecord> ReshardingTxnCloner::_getNextRecord(OperationC boost::optional<SharedSemiFuture<void>> ReshardingTxnCloner::doOneRecord( OperationContext* opCtx, const SessionTxnRecord& donorRecord) { + auto sessionId = donorRecord.getSessionId(); + auto txnNumber = donorRecord.getTxnNum(); + + if (isInternalSessionForNonRetryableWrite(sessionId)) { + // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for + // non-retryable writes. + return boost::none; + } + + if (isInternalSessionForRetryableWrite(sessionId)) { + // Turn this into write history for the retryable write that this internal transaction + // corresponds to in order to avoid making retryable internal transactions have a sentinel + // noop oplog entry at all. + txnNumber = *sessionId.getTxnNumber(); + sessionId = *getParentSessionId(sessionId); + } + return resharding::data_copy::withSessionCheckedOut( - opCtx, donorRecord.getSessionId(), donorRecord.getTxnNum(), boost::none /* stmtId */, [&] { + opCtx, sessionId, txnNumber, boost::none /* stmtId */, [&] { resharding::data_copy::updateSessionRecord(opCtx, TransactionParticipant::kDeadEndSentinel, {kIncompleteHistoryStmtId}, |