diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2022-03-06 01:06:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-06 01:33:17 +0000 |
commit | c176492ec7c149a081585ad52416cdc630edab81 (patch) | |
tree | 9ac61bafb7431d442ea614f93332fa2fd8229fe2 /src/mongo/db/s/resharding | |
parent | 365c2667c56b7cddb7ef0c69e9440794e3d84c09 (diff) | |
download | mongo-c176492ec7c149a081585ad52416cdc630edab81.tar.gz |
Revert "SERVER-63441 Handle retryable internal transactions with multiple oplog entries when migrating sessions during resharding"
This reverts commit 97d47a47ed9629ae4206a7b8e20be61aef8d17ec.
Diffstat (limited to 'src/mongo/db/s/resharding')
9 files changed, 148 insertions, 534 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 8be9fbb4669..e000d42f5ae 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -307,16 +307,13 @@ void updateSessionRecord(OperationContext* opCtx, auto txnParticipant = TransactionParticipant::get(opCtx); invariant(txnParticipant, "Must be called with session checked out"); - const auto sessionId = *opCtx->getLogicalSessionId(); - const auto txnNumber = *opCtx->getTxnNumber(); - repl::MutableOplogEntry oplogEntry; oplogEntry.setOpType(repl::OpTypeEnum::kNoop); oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag); oplogEntry.setObject2(std::move(o2Field)); oplogEntry.setNss({}); - oplogEntry.setSessionId(sessionId); - oplogEntry.setTxnNumber(txnNumber); + oplogEntry.setSessionId(opCtx->getLogicalSessionId()); + oplogEntry.setTxnNumber(opCtx->getTxnNumber()); oplogEntry.setStatementIds(stmtIds); oplogEntry.setPreImageOpTime(std::move(preImageOpTime)); oplogEntry.setPostImageOpTime(std::move(postImageOpTime)); @@ -324,33 +321,37 @@ void updateSessionRecord(OperationContext* opCtx, oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); oplogEntry.setFromMigrate(true); - writeConflictRetry( - opCtx, - "resharding::data_copy::updateSessionRecord", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - - WriteUnitOfWork wuow(opCtx); - repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry); - - uassert(4989901, - str::stream() << "Failed to create new oplog entry: " - << redact(oplogEntry.toBSON()), - !opTime.isNull()); - - // Use the same wallTime as the oplog entry since SessionUpdateTracker - // looks at the oplog entry wallTime when replicating. - SessionTxnRecord sessionTxnRecord( - sessionId, txnNumber, std::move(opTime), oplogEntry.getWallClockTime()); - if (isInternalSessionForRetryableWrite(sessionId)) { - sessionTxnRecord.setParentSessionId(*getParentSessionId(sessionId)); - } - - txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord); - - wuow.commit(); - }); + writeConflictRetry(opCtx, + "resharding::data_copy::updateSessionRecord", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + + WriteUnitOfWork wuow(opCtx); + repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry); + + uassert(4989901, + str::stream() << "Failed to create new oplog entry: " + << redact(oplogEntry.toBSON()), + !opTime.isNull()); + + // Use the same wallTime as the oplog entry since SessionUpdateTracker + // looks at the oplog entry wallTime when replicating. + SessionTxnRecord sessionTxnRecord(*oplogEntry.getSessionId(), + *oplogEntry.getTxnNumber(), + std::move(opTime), + oplogEntry.getWallClockTime()); + + if (isInternalSessionForRetryableWrite(*oplogEntry.getSessionId())) { + sessionTxnRecord.setParentSessionId( + *getParentSessionId(*oplogEntry.getSessionId())); + } + + txnParticipant.onRetryableWriteCloningCompleted( + opCtx, stmtIds, sessionTxnRecord); + + wuow.commit(); + }); } } // namespace mongo::resharding::data_copy diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 452cef3ea9a..b65d5376689 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -78,8 +78,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch( CancellationToken cancelToken, CancelableOperationContextFactory factory) { Timer latencyTimer; - auto crudWriterVectors = _batchPreparer.makeCrudOpWriterVectors( - _currentBatchToApply, _currentDerivedOpsForCrudWriters); + auto crudWriterVectors = + _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps); CancellationSource errorSource(cancelToken); @@ -99,8 +99,7 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch( } } - auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors( - _currentBatchToApply, _currentDerivedOpsForSessionWriters); + auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply); batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size()); for (auto&& writer : sessionWriterVectors) { @@ -153,8 +152,7 @@ SemiFuture<void> ReshardingOplogApplier::run( "reshardingApplyOplogBatchTwice failpoint enabled, applying batch " "a second time", "batchSize"_attr = _currentBatchToApply.size()); - _currentDerivedOpsForCrudWriters.clear(); - _currentDerivedOpsForSessionWriters.clear(); + _currentDerivedOps.clear(); return _applyBatch(executor, cancelToken, factory); } return SemiFuture<void>(); @@ -242,8 +240,7 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext* } _currentBatchToApply.clear(); - _currentDerivedOpsForCrudWriters.clear(); - _currentDerivedOpsForSessionWriters.clear(); + _currentDerivedOps.clear(); } NamespaceString ReshardingOplogApplier::ensureStashCollectionExists( diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index f7835c2ee64..0c3a8bc8306 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -146,8 +146,7 @@ private: OplogBatch _currentBatchToApply; // Buffer for internally generated oplog entries that needs to be processed for this batch. - std::list<repl::OplogEntry> _currentDerivedOpsForCrudWriters; - std::list<repl::OplogEntry> _currentDerivedOpsForSessionWriters; + std::list<repl::OplogEntry> _currentDerivedOps; // The source of the oplog entries to be applied. std::unique_ptr<ReshardingDonorOplogIteratorInterface> _oplogIter; diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp index bde8901926a..a6f73b01cec 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp @@ -50,7 +50,8 @@ namespace { * Return true if we need to update config.transactions collection for this oplog entry. */ bool shouldUpdateTxnTable(const repl::OplogEntry& op) { - if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { + if (op.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction || + op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { return true; } @@ -63,21 +64,8 @@ bool shouldUpdateTxnTable(const repl::OplogEntry& op) { } if (op.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { - // This applyOps oplog entry is guaranteed to correspond to a committed transaction since - // the resharding aggregation pipeline does not output applyOps oplog entries for aborted - // transactions (i.e. it only outputs the abortTransaction oplog entry). - - if (isInternalSessionForRetryableWrite(*op.getSessionId())) { - // For a retryable internal transaction, we need to update the config.transactions - // collection upon writing the noop oplog entries for retryable operations contained - // within each applyOps oplog entry. - return true; - } - - // The resharding aggregation pipeline also does not output the commitTransaction oplog - // entry so for a non-retryable transaction, we need to the update to the - // config.transactions collection upon seeing the final applyOps oplog entry. - return !op.isPartialTransaction(); + auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); + return !applyOpsInfo.getPrepare() && !applyOpsInfo.getPartialTxn(); } return false; @@ -128,8 +116,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with - // WouldChangeOwningShard sentinel noop entry. uassert( ErrorCodes::OplogOperationUnsupported, str::stream() << "Commands within applyOps are not supported during resharding: " @@ -162,7 +148,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors( } WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( - const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const { + const OplogBatchToPrepare& batch) const { auto writerVectors = _makeEmptyWriterVectors(); struct SessionOpsList { @@ -202,52 +188,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors( } else if (op.isCommand()) { throwIfUnsupportedCommandOp(op); - if (!shouldUpdateTxnTable(op)) { - continue; - } - - auto sessionId = *op.getSessionId(); - - if (isInternalSessionForRetryableWrite(sessionId) && - op.getCommandType() == OplogEntry::CommandType::kApplyOps) { - // Derive retryable write CRUD oplog entries from this retryable internal - // transaction applyOps oplog entry. - - auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject()); - // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with - // WouldChangeOwningShard sentinel noop entry. - uassert(ErrorCodes::OplogOperationUnsupported, - str::stream() - << "Commands within applyOps are not supported during resharding: " - << redact(op.toBSONForLogging()), - applyOpsInfo.areOpsCrudOnly()); - - auto unrolledOp = - uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON())); - unrolledOp.setSessionId(*getParentSessionId(sessionId)); - unrolledOp.setTxnNumber(*sessionId.getTxnNumber()); - - for (const auto& innerOp : applyOpsInfo.getOperations()) { - auto replOp = repl::ReplOperation::parse( - {"ReshardingOplogBatchPreparer::makeSessionOpWriterVectors innerOp"}, - innerOp); - if (replOp.getStatementIds().empty()) { - // Skip this operation since it is not retryable. - continue; - } - unrolledOp.setDurableReplOperation(replOp); - - // There isn't a direct way to convert from a MutableOplogEntry to a - // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get - // re-parsed into an OplogEntry. - auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON()); - invariant(derivedOp.isCrudOpType()); - - // `&derivedOp` is guaranteed to remain stable while we append more derived - // oplog entries because `derivedOps` is a std::list. - updateSessionTracker(&derivedOp); - } - } else { + if (shouldUpdateTxnTable(op)) { updateSessionTracker(&op); } } else { diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h index 27e7934b326..f4f035ccdbd 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h @@ -82,13 +82,11 @@ public: * to the config.transactions record for a higher txnNumber will cause any updates in `batch` * for lower txnNumbers to be elided. * - * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller - * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being - * applied and must take care not to modify `batch` or `derivedOps` until after the writer - * vectors have all been applied. + * The returned writer vectors refer to memory owned by `batch`. The caller must take care to + * ensure `batch` outlives the writer vectors all being applied and must take care not to modify + * `batch` until after the writer vectors have all been applied. */ - WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch, - std::list<OplogEntry>& derivedOps) const; + WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const; static void throwIfUnsupportedCommandOp(const OplogEntry& op); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp index c98104fcf3a..07ae756eeae 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp @@ -31,7 +31,6 @@ #include <boost/optional/optional_io.hpp> -#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -67,42 +66,25 @@ protected: return {op.toBSON()}; } - /** - * Returns an applyOps oplog entry containing insert operations for the given documents. If the - * session is an internal session for retryable writes, uses the "_id" of each document as its - * statement id. - */ - repl::OplogEntry makeApplyOpsForInsert(const std::vector<BSONObj> documents, - boost::optional<LogicalSessionId> lsid = boost::none, - boost::optional<TxnNumber> txnNumber = boost::none, - boost::optional<bool> isPrepare = boost::none, - boost::optional<bool> isPartial = boost::none) { - BSONObjBuilder applyOpsBuilder; + repl::OplogEntry makeApplyOps(BSONObj document, + bool isPrepare, + bool isPartial, + boost::optional<LogicalSessionId> lsid, + boost::optional<TxnNumber> txnNumber) { - BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); - for (const auto& document : documents) { - auto insertOp = repl::DurableReplOperation(repl::OpTypeEnum::kInsert, {}, document); - if (lsid && isInternalSessionForRetryableWrite(*lsid)) { - if (!document.hasField("_id")) { - continue; - } - auto id = document.getIntField("_id"); - insertOp.setStatementIds({{id}}); - } - opsArrayBuilder.append(insertOp.toBSON()); - } - opsArrayBuilder.done(); + std::vector<mongo::BSONObj> operations; + auto insertOp = repl::MutableOplogEntry::makeInsertOperation( + NamespaceString("foo.bar"), UUID::gen(), document, document); + + BSONObjBuilder applyOpsBuilder; + applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON())); if (isPrepare) { - invariant(lsid); - invariant(txnNumber); - applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, *isPrepare); + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true); } if (isPartial) { - invariant(lsid); - invariant(txnNumber); - applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, *isPartial); + applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true); } repl::MutableOplogEntry op; @@ -212,13 +194,24 @@ TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) { // We use the "fromApplyOps" field in the document to distinguish between the regular oplog // entries from the derived ones later on. int numOps = 20; - std::vector<BSONObj> docsForApplyOps; for (int i = 0; i < numOps; ++i) { batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false))); - docsForApplyOps.push_back(BSON("_id" << i << "fromApplyOps" << true)); } - batch.emplace_back(makeApplyOpsForInsert(docsForApplyOps)); + BSONObjBuilder applyOpsBuilder; + { + BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps"); + for (int i = 0; i < numOps; ++i) { + // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to + // deal with setting the 'o2' field. + opsArrayBuilder.append( + repl::DurableReplOperation( + repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true)) + .toBSON()); + } + } + + batch.emplace_back(makeCommandOp(applyOpsBuilder.done())); std::list<repl::OplogEntry> derivedOps; auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); @@ -261,8 +254,12 @@ TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) { } else { // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to // deal with setting the 'o2' field. - batch.emplace_back( - makeApplyOpsForInsert({BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)})); + batch.emplace_back(makeCommandOp(BSON( + "applyOps" << BSON_ARRAY(repl::DurableReplOperation( + repl::OpTypeEnum::kInsert, + {}, + BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)) + .toBSON())))); } } @@ -293,10 +290,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid) batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1})); } - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); auto writer = getNonEmptyWriterVector(writerVectors); ASSERT_EQ(writer.size(), numOps); @@ -316,10 +311,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) { batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i})); } - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOps.size(), 0U); auto writer = getNonEmptyWriterVector(writerVectors); ASSERT_EQ(writer.size(), 1U); @@ -337,8 +330,7 @@ TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFai makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1})); } - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); ASSERT_EQ(writerVectors.size(), kNumWriterVectors); // Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid @@ -365,319 +357,107 @@ TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) { batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1))); std::list<repl::OplogEntry> derivedOps; - ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch, derivedOps), + ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch), DBException, ErrorCodes::OplogOperationUnsupported); } } TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) { - auto runTest = [&](const boost::optional<LogicalSessionId>& lsid, - const boost::optional<TxnNumber>& txnNumber) { - OplogBatch batch; + OplogBatch batch; - int numOps = 5; - for (int i = 0; i < numOps; ++i) { - repl::MutableOplogEntry op; - op.setSessionId(lsid); - op.setTxnNumber(txnNumber); - op.setOpType(repl::OpTypeEnum::kNoop); - op.setObject({}); - op.setNss({}); - op.setOpTime({{}, {}}); - op.setWallClockTime({}); - batch.emplace_back(op.toBSON()); - } + int numOps = 5; + for (int i = 0; i < numOps; ++i) { + repl::MutableOplogEntry op; + op.setOpType(repl::OpTypeEnum::kNoop); + op.setObject({}); + op.setNss({}); + op.setOpTime({{}, {}}); + op.setWallClockTime({}); + batch.emplace_back(op.toBSON()); + } + + std::list<repl::OplogEntry> derivedOps; + auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps); + ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(derivedOps.size(), 0U); + ASSERT_EQ(writerVectors[0].size(), 0U); + ASSERT_EQ(writerVectors[1].size(), 0U); - std::list<repl::OplogEntry> derivedOpsForCrudWriters; - auto writerVectors = - _batchPreparer.makeCrudOpWriterVectors(batch, derivedOpsForCrudWriters); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOpsForCrudWriters.size(), 0U); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); - - std::list<repl::OplogEntry> derivedOpsForSessionWriters; - writerVectors = - _batchPreparer.makeSessionOpWriterVectors(batch, derivedOpsForSessionWriters); - ASSERT_EQ(writerVectors.size(), kNumWriterVectors); - ASSERT_EQ(derivedOpsForSessionWriters.size(), 0U); - ASSERT_EQ(writerVectors[0].size(), 0U); - ASSERT_EQ(writerVectors[1].size(), 0U); - }; - - runTest(boost::none, boost::none); - - TxnNumber txnNumber{1}; - runTest(makeLogicalSessionIdForTest(), txnNumber); - runTest(makeLogicalSessionIdWithTxnUUIDForTest(), txnNumber); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), txnNumber); + writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + ASSERT_EQ(writerVectors.size(), kNumWriterVectors); + ASSERT_EQ(writerVectors[0].size(), 0U); + ASSERT_EQ(writerVectors[1].size(), 0U); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForApplyOpsWithoutTxnNumber) { OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0)})); + batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, boost::none, boost::none)); - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_EQ(derivedOps.size(), 0U); + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); for (const auto& writer : writerVectors) { ASSERT_TRUE(writer.empty()); } } - -TEST_F(ReshardingOplogBatchPreparerTest, - SessionWriteVectorsDeriveCrudOpsForApplyOpsForRetryableInternalTransaction) { - const auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); - const TxnNumber txnNumber{1}; - +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) { OplogBatch batch; - // 'makeApplyOpsForInsert' uses the "_id" of each document as the "stmtId" for its insert - // operation. The insert operation without a stmtId should not have a derived operation. - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSONObj(), BSON("_id" << 1)}, - lsid, - txnNumber, - false /* isPrepare */, - false /* isPartial */)); + auto lsid = makeLogicalSessionIdForTest(); - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); + batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, lsid, 2)); + + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); ASSERT_FALSE(writerVectors.empty()); auto writer = getNonEmptyWriterVector(writerVectors); - - ASSERT_EQ(writer.size(), 2U); - ASSERT_EQ(derivedOps.size(), 2U); - for (size_t i = 0; i < writer.size(); ++i) { - ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); - } -} - -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; - - OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, - lsid, - txnNumber, - false /* isPrepare */, - false /* isPartial */)); - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_FALSE(writerVectors.empty()); - - auto writer = getNonEmptyWriterVector(writerVectors); - - if (isInternalSessionForRetryableWrite(lsid)) { - ASSERT_EQ(writer.size(), 2U); - ASSERT_EQ(derivedOps.size(), 2U); - for (size_t i = 0; i < writer.size(); ++i) { - ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); - } - } else { - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); - ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); - } - }; - - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); -} - -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeUnpreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; - - OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, - lsid, - txnNumber, - false /* isPrepare */, - true /* isPartial */)); - batch.emplace_back(makeApplyOpsForInsert( - {BSON("_id" << 2)}, lsid, txnNumber, false /* isPrepare */, false /* isPartial */)); - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_FALSE(writerVectors.empty()); - - auto writer = getNonEmptyWriterVector(writerVectors); - - if (isInternalSessionForRetryableWrite(lsid)) { - ASSERT_EQ(writer.size(), 3U); - ASSERT_EQ(derivedOps.size(), 3U); - for (size_t i = 0; i < writer.size(); ++i) { - ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); - } - } else { - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); - ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); - } - }; - - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); -} - -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallCommittedPreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; - - OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, - lsid, - txnNumber, - true /* isPrepare */, - false /* isPartial */)); - batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber)); - - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_FALSE(writerVectors.empty()); - - auto writer = getNonEmptyWriterVector(writerVectors); - - if (isInternalSessionForRetryableWrite(lsid)) { - ASSERT_EQ(writer.size(), 2U); - ASSERT_EQ(derivedOps.size(), 2U); - for (size_t i = 0; i < writer.size(); ++i) { - ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); - } - } else { - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); - ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); - } - }; - - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), 2); } -TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeCommittedPreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; +TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) { + OplogBatch batch; + auto lsid = makeLogicalSessionIdForTest(); - OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)}, - lsid, - txnNumber, - false /* isPrepare */, - true /* isPartial */)); - batch.emplace_back(makeApplyOpsForInsert( - {BSON("_id" << 2)}, lsid, txnNumber, true /* isPrepare */, false /* isPartial */)); - batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber)); + batch.emplace_back(makeApplyOps(BSON("_id" << 3), true, false, lsid, 2)); + batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, 2)); - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_FALSE(writerVectors.empty()); - - auto writer = getNonEmptyWriterVector(writerVectors); - - if (isInternalSessionForRetryableWrite(lsid)) { - ASSERT_EQ(writer.size(), 3U); - ASSERT_EQ(derivedOps.size(), 3U); - for (size_t i = 0; i < writer.size(); ++i) { - ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i)))); - } - } else { - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); - ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps); - } - }; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + ASSERT_FALSE(writerVectors.empty()); - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + auto writer = getNonEmptyWriterVector(writerVectors); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), 2); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; + OplogBatch batch; + auto lsid = makeLogicalSessionIdForTest(); - OplogBatch batch; - batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, txnNumber)); + batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, 2)); - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - ASSERT_FALSE(writerVectors.empty()); - - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 0U); - ASSERT_EQ(writer[0]->getSessionId(), lsid); - ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber); - ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction); - }; - - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); + ASSERT_FALSE(writerVectors.empty()); + + auto writer = getNonEmptyWriterVector(writerVectors); + ASSERT_EQ(writer.size(), 1U); + ASSERT_EQ(writer[0]->getSessionId(), lsid); + ASSERT_EQ(*writer[0]->getTxnNumber(), 2); } TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) { - auto runTest = [&](const LogicalSessionId& lsid) { - const TxnNumber txnNumber{1}; + OplogBatch batch; + auto lsid = makeLogicalSessionIdForTest(); - OplogBatch batch; - batch.emplace_back(makeApplyOpsForInsert( - {BSON("_id" << 0)}, lsid, txnNumber, false /* isPrepare */, true /* isPartial */)); + batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, true, lsid, 2)); - std::list<repl::OplogEntry> derivedOps; - auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps); - if (isInternalSessionForRetryableWrite(lsid)) { - ASSERT_FALSE(writerVectors.empty()); - auto writer = getNonEmptyWriterVector(writerVectors); - ASSERT_EQ(writer.size(), 1U); - ASSERT_EQ(derivedOps.size(), 1U); - ASSERT_EQ(writer[0]->getSessionId(), *getParentSessionId(lsid)); - ASSERT_EQ(*writer[0]->getTxnNumber(), *lsid.getTxnNumber()); - ASSERT(writer[0]->getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(writer[0]->getObject(), (BSON("_id" << 0))); - } else { - ASSERT_EQ(derivedOps.size(), 0U); - for (const auto& writer : writerVectors) { - ASSERT_TRUE(writer.empty()); - } - } - }; + auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch); - runTest(makeLogicalSessionIdForTest()); - runTest(makeLogicalSessionIdWithTxnUUIDForTest()); - runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + for (const auto& writer : writerVectors) { + ASSERT_TRUE(writer.empty()); + } } } // namespace diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp index 123a740f119..5d6b635e9e4 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp @@ -78,29 +78,8 @@ repl::OpTime ReshardingOplogSessionApplication::_logPrePostImage( } boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryApplyOperation( - OperationContext* opCtx, const mongo::repl::OplogEntry& op) const { - invariant(op.getSessionId()); - invariant(op.getTxnNumber()); - + OperationContext* opCtx, const repl::OplogEntry& op) const { auto lsid = *op.getSessionId(); - if (isInternalSessionForNonRetryableWrite(lsid)) { - // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for - // non-retryable writes. - return boost::none; - } - if (isInternalSessionForRetryableWrite(lsid)) { - // The oplog preparer should have turned each applyOps oplog entry for a retryable internal - // transaction into retryable write CRUD oplog entries. - invariant(op.getCommandType() != repl::OplogEntry::CommandType::kApplyOps); - - if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) { - // Skip this oplog entry since there is no retryable write history to apply and writing - // a sentinel noop oplog entry would make retryable write statements that successfully - // executed outside of this internal transaction not retryable. - return boost::none; - } - } - auto txnNumber = *op.getTxnNumber(); bool isRetryableWrite = op.isCrudOpType(); @@ -109,7 +88,6 @@ boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryAp auto stmtIds = isRetryableWrite ? op.getStatementIds() : std::vector<StmtId>{kIncompleteHistoryStmtId}; - invariant(!stmtIds.empty()); boost::optional<repl::OpTime> preImageOpTime; if (auto preImageOp = op.getPreImageOp()) { diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp index 1116a41ec2a..6c57d2271a5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp @@ -68,8 +68,6 @@ public: MongoDSessionCatalog::onStepUp(opCtx.get()); } - - serverGlobalParams.clusterRole = ClusterRole::ShardServer; } repl::OpTime insertSessionRecord(OperationContext* opCtx, @@ -949,66 +947,5 @@ TEST_F(ReshardingOplogSessionApplicationTest, IncomingTxnHasHigherTxnNumberThanP ASSERT_OK(hitPreparedTxn->getNoThrow()); } -TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingAbortedRetryableInternalTransaction) { - auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest(); - - TxnNumber incomingTxnNumber = 100; - - auto opTime = [&] { - auto opCtx = makeOperationContext(); - return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3}); - }(); - - // 'makeFinishTxnOp' returns an abortTransaction oplog entry. - auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); - - { - auto opCtx = makeOperationContext(); - ReshardingOplogSessionApplication applier; - auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry); - ASSERT_FALSE(bool(hitPreparedTxn)); - } - - { - auto opCtx = makeOperationContext(); - auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp()); - ASSERT_EQ(foundOps.size(), 0U); - - auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid); - ASSERT_FALSE(bool(sessionTxnRecord)); - } -} - -TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingNonRetryableInternalTransaction) { - // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for - // non-retryable writes. - auto lsid = makeLogicalSessionIdWithTxnUUIDForTest(); - - TxnNumber incomingTxnNumber = 100; - - auto opTime = [&] { - auto opCtx = makeOperationContext(); - return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3}); - }(); - - auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber); - - { - auto opCtx = makeOperationContext(); - ReshardingOplogSessionApplication applier; - auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry); - ASSERT_FALSE(bool(hitPreparedTxn)); - } - - { - auto opCtx = makeOperationContext(); - auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp()); - ASSERT_EQ(foundOps.size(), 0U); - - auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid); - ASSERT_FALSE(bool(sessionTxnRecord)); - } -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp index a6d9836a334..cfb088a7655 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp @@ -167,25 +167,8 @@ boost::optional<SessionTxnRecord> ReshardingTxnCloner::_getNextRecord(OperationC boost::optional<SharedSemiFuture<void>> ReshardingTxnCloner::doOneRecord( OperationContext* opCtx, const SessionTxnRecord& donorRecord) { - auto sessionId = donorRecord.getSessionId(); - auto txnNumber = donorRecord.getTxnNum(); - - if (isInternalSessionForNonRetryableWrite(sessionId)) { - // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for - // non-retryable writes. - return boost::none; - } - - if (isInternalSessionForRetryableWrite(sessionId)) { - // Turn this into write history for the retryable write that this internal transaction - // corresponds to in order to avoid making retryable internal transactions have a sentinel - // noop oplog entry at all. - txnNumber = *sessionId.getTxnNumber(); - sessionId = *getParentSessionId(sessionId); - } - return resharding::data_copy::withSessionCheckedOut( - opCtx, sessionId, txnNumber, boost::none /* stmtId */, [&] { + opCtx, donorRecord.getSessionId(), donorRecord.getTxnNum(), boost::none /* stmtId */, [&] { resharding::data_copy::updateSessionRecord(opCtx, TransactionParticipant::kDeadEndSentinel, {kIncompleteHistoryStmtId}, |