diff options
author | Judah Schvimer <judah.schvimer@10gen.com> | 2019-10-11 17:57:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-11 17:57:43 +0000 |
commit | 20883d237ab0e4ee45c3aa6bc5f00602265402b1 (patch) | |
tree | d08d08979c909e7e45b2dd9bbba3483ff1652b63 | |
parent | 795f9bc3047184cc27f1643fa7c06bf2386f6218 (diff) | |
download | mongo-20883d237ab0e4ee45c3aa6bc5f00602265402b1.tar.gz |
SERVER-41437 minor transaction_oplog_application.js clean up
SERVER-41437 unify transaction oplog traversal
(cherry picked from commit 6dabe5871fcd280f654e475e7048385b54b1ea64)
(cherry picked from commit f14d9c4c07e69c2109de0af059123060c73cdd77)
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 321 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.h | 3 |
8 files changed, 232 insertions, 248 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 4f073418500..675573b8a38 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -494,14 +494,13 @@ Status applyOps(OperationContext* opCtx, // static MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) { MultiApplier::Operations result; - extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none); + extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result); return result; } void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, const BSONObj& topLevelDoc, - MultiApplier::Operations* operations, - boost::optional<Timestamp> commitOplogEntryTS) { + MultiApplier::Operations* operations) { uassert(ErrorCodes::TypeMismatch, str::stream() << "ApplyOps::extractOperations(): not a command: " << redact(applyOpsOplogEntry.toBSON()), @@ -521,14 +520,8 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, for (const auto& elem : operationDocs) { auto operationDoc = elem.Obj(); - BSONObjBuilder builder(operationDoc); - - // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique - // will not overwrite this value. - if (commitOplogEntryTS) { - builder.append("ts", *commitOplogEntryTS); - } + BSONObjBuilder builder(operationDoc); builder.appendElementsUnique(topLevelDoc); auto operation = builder.obj(); diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h index 8aac61a39b9..7489c94a486 100644 --- a/src/mongo/db/repl/apply_ops.h +++ b/src/mongo/db/repl/apply_ops.h @@ -55,16 +55,12 @@ public: /** * This variant allows optimization for extracting multiple applyOps operations. The entry for * the non-DurableReplOperation fields of the extracted operation must be specified as - * 'topLevelDoc', and need not be any of the applyOps operations. - * - * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations - * from a prepare oplog entry during initial sync. These operations must be timestamped at the - * commit oplog entry timestamp instead of the prepareTimestamp. + * 'topLevelDoc', and need not be any of the applyOps operations. The 'topLevelDoc' entry's + * 'ts' field will be used as the 'ts' field for each operation. */ static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry, const BSONObj& topLevelDoc, - MultiApplier::Operations* operations, - boost::optional<Timestamp> commitOplogEntryTS); + MultiApplier::Operations* operations); }; /** diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index f6c26167f4a..b6cf0d36764 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -388,8 +388,17 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence SyncTailTest::makeInitialSyncOptions()); std::vector<MultiApplier::OperationPtrs> writerVectors(1); std::vector<MultiApplier::Operations> derivedOps; - // Derive ops for transactions if necessary. - syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps); + + // Keeps all operations in scope for the lifetime of this function. + std::vector<MultiApplier::Operations> singleOpVectors; + for (auto&& entry : ops) { + // Derive ops for transactions if necessary. + std::vector<OplogEntry> op; + op.push_back(entry); + singleOpVectors.emplace_back(op); + syncTail.fillWriterVectors( + _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps); + } const auto& opPtrs = writerVectors[0]; ASSERT_OK(runOpPtrsInitialSync(opPtrs)); diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 0e222aa1bc8..a672e73f333 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -201,6 +201,23 @@ public: } /** + * Returns if this is a prepared 'commitTransaction' oplog entry. + */ + bool isPreparedCommit() const { + return getCommandType() == OplogEntry::CommandType::kCommitTransaction; + } + + /** + * Returns whether the oplog entry represents an applyOps which is a self-contained atomic + * operation, or the last applyOps of an unprepared transaction, as opposed to part of a + * prepared transaction or a non-final applyOps in a transaction. + */ + bool isTerminalApplyOps() const { + return getCommandType() == OplogEntry::CommandType::kApplyOps && !shouldPrepare() && + !isPartialTransaction() && !getObject().getBoolField("prepare"); + } + + /** * Returns if the oplog entry is for a CRUD operation. */ static bool isCrudOpType(OpTypeEnum opType); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 7efe5ee10b4..6938895799c 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -869,20 +869,6 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, } } -// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation, -// or the last applyOps of an unprepared transaction, as opposed to part of a prepared transaction -// or a non-final applyOps in an transaction. -inline bool isCommitApplyOps(const OplogEntry& entry) { - return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare() && - !entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare"); -} - -// Returns whether a commitTransaction oplog entry is a part of a prepared transaction. -inline bool isPreparedCommit(const OplogEntry& entry) { - return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction; -} - - void SyncTail::shutdown() { stdx::lock_guard<stdx::mutex> lock(_mutex); _inShutdown = true; @@ -1147,7 +1133,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) { + SessionUpdateTracker* sessionUpdateTracker) noexcept { const auto serviceContext = opCtx->getServiceContext(); const auto storageEngine = serviceContext->getStorageEngine(); @@ -1182,7 +1168,9 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit. // We must save it here because we are not guaranteed it has been written to the oplog // yet. - if (op.isPartialTransaction()) { + // We also do this for prepare during initial sync. + if (op.isPartialTransaction() || + (op.shouldPrepare() && _options.mode == OplogApplication::Mode::kInitialSync)) { auto& partialTxnList = partialTxnOps[*op.getSessionId()]; // If this operation belongs to an existing partial transaction, partialTxnList // must contain the previous operations of the transaction. @@ -1222,43 +1210,29 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Extract applyOps operations and fill writers with extracted operations using this // function. - if (isCommitApplyOps(op)) { - try { - auto logicalSessionId = op.getSessionId(); - // applyOps entries generated by a transaction must have a sessionId and a - // transaction number. - if (logicalSessionId && op.getTxnNumber()) { - // On commit of unprepared transactions, get transactional operations from the - // oplog and fill writers with those operations. - // Flush partialTxnList operations for current transaction. - auto& partialTxnList = partialTxnOps[*logicalSessionId]; - { - // We need to use a ReadSourceScope avoid the reads of the transaction - // messing up the state of the opCtx. In particular we do not want to - // set the ReadSource to kLastApplied. - ReadSourceScope readSourceScope(opCtx); - derivedOps->emplace_back(readTransactionOperationsFromOplogChain( - opCtx, op, partialTxnList, boost::none)); - partialTxnList.clear(); - } - // Transaction entries cannot have different session updates. - _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } else { - // The applyOps entry was not generated as part of a transaction. - invariant(!op.getPrevWriteOpTimeInTransaction()); - derivedOps->emplace_back(ApplyOps::extractOperations(op)); + if (op.isTerminalApplyOps()) { + auto logicalSessionId = op.getSessionId(); + // applyOps entries generated by a transaction must have a sessionId and a + // transaction number. + if (logicalSessionId && op.getTxnNumber()) { + // On commit of unprepared transactions, get transactional operations from the + // oplog and fill writers with those operations. + // Flush partialTxnList operations for current transaction. + auto& partialTxnList = partialTxnOps[*logicalSessionId]; + + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + partialTxnList.clear(); - // Nested entries cannot have different session updates. - _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); - } - } catch (...) { - fassertFailedWithStatusNoTrace( - 50711, - exceptionToStatus().withContext(str::stream() - << "Unable to extract operations from applyOps " - << redact(op.toBSON()))); + // Transaction entries cannot have different session updates. + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } else { + // The applyOps entry was not generated as part of a transaction. + invariant(!op.getPrevWriteOpTimeInTransaction()); + derivedOps->emplace_back(ApplyOps::extractOperations(op)); + + // Nested entries cannot have different session updates. + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } continue; } @@ -1266,24 +1240,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // If we see a commitTransaction command that is a part of a prepared transaction during // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers // with the extracted operations. - if (isPreparedCommit(op) && (_options.mode == OplogApplication::Mode::kInitialSync)) { + if (op.isPreparedCommit() && (_options.mode == OplogApplication::Mode::kInitialSync)) { auto logicalSessionId = op.getSessionId(); auto& partialTxnList = partialTxnOps[*logicalSessionId]; - { - // Traverse the oplog chain with its own snapshot and read timestamp. - ReadSourceScope readSourceScope(opCtx); - - // Get the previous oplog entry, which should be a prepare oplog entry. - const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op); - invariant(prevOplogEntry.shouldPrepare()); - - // Extract the operations from the applyOps entry. - auto commitOplogEntryOpTime = op.getOpTime(); - derivedOps->emplace_back(readTransactionOperationsFromOplogChain( - opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp())); - partialTxnList.clear(); - } + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + partialTxnList.clear(); _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); continue; @@ -1300,7 +1263,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, void SyncTail::fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) { + std::vector<MultiApplier::Operations>* derivedOps) noexcept { SessionUpdateTracker sessionUpdateTracker; _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index db036ac771a..c4923544c29 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -241,7 +241,7 @@ public: void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps); + std::vector<MultiApplier::Operations>* derivedOps) noexcept; private: class OpQueueBatcher; @@ -252,7 +252,7 @@ private: MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker); + SessionUpdateTracker* sessionUpdateTracker) noexcept; /** * Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 368e8c2200a..eb2a44af338 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -83,26 +83,17 @@ Status _applyOperationsForTransaction(OperationContext* opCtx, * Helper that will read the entire sequence of oplog entries for the transaction and apply each of * them. * - * Currently used for oplog application of a commitTransaction oplog entry during recovery, rollback - * and initial sync. + * Currently used for oplog application of a commitTransaction oplog entry during recovery and + * rollback. */ Status _applyTransactionFromOplogChain(OperationContext* opCtx, const OplogEntry& entry, repl::OplogApplication::Mode mode, Timestamp commitTimestamp, Timestamp durableTimestamp) { - invariant(mode == repl::OplogApplication::Mode::kRecovering || - mode == repl::OplogApplication::Mode::kInitialSync); + invariant(mode == repl::OplogApplication::Mode::kRecovering); - repl::MultiApplier::Operations ops; - { - // Traverse the oplog chain with its own snapshot and read timestamp. - ReadSourceScope readSourceScope(opCtx); - - // Get the corresponding prepare applyOps oplog entry. - const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry); - ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none); - } + auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {}); const auto dbName = entry.getNss().db().toString(); Status status = Status::OK(); @@ -151,107 +142,129 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx, Status applyCommitTransaction(OperationContext* opCtx, const OplogEntry& entry, repl::OplogApplication::Mode mode) { - // Return error if run via applyOps command. - uassert(50987, - "commitTransaction is only used internally by secondaries.", - mode != repl::OplogApplication::Mode::kApplyOpsCmd); - IDLParserErrorContext ctx("commitTransaction"); auto commitOplogEntryOpTime = entry.getOpTime(); auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject()); invariant(commitCommand.getCommitTimestamp()); - if (mode == repl::OplogApplication::Mode::kRecovering) { - return _applyTransactionFromOplogChain(opCtx, - entry, - mode, - *commitCommand.getCommitTimestamp(), - commitOplogEntryOpTime.getTimestamp()); + switch (mode) { + case repl::OplogApplication::Mode::kRecovering: { + return _applyTransactionFromOplogChain(opCtx, + entry, + mode, + *commitCommand.getCommitTimestamp(), + commitOplogEntryOpTime.getTimestamp()); + } + case repl::OplogApplication::Mode::kInitialSync: { + // Initial sync should never apply 'commitTransaction' since it unpacks committed + // transactions onto various applier threads. + MONGO_UNREACHABLE; + } + case repl::OplogApplication::Mode::kApplyOpsCmd: { + // Return error if run via applyOps command. + uasserted(50987, "commitTransaction is only used internally by secondaries."); + } + case repl::OplogApplication::Mode::kSecondary: { + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); + + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx); + + auto transaction = TransactionParticipant::get(opCtx); + invariant(transaction); + transaction.unstashTransactionResources(opCtx, "commitTransaction"); + transaction.commitPreparedTransaction( + opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime); + return Status::OK(); + } } - - invariant(mode == repl::OplogApplication::Mode::kSecondary); - - // Transaction operations are in its own batch, so we can modify their opCtx. - invariant(entry.getSessionId()); - invariant(entry.getTxnNumber()); - opCtx->setLogicalSessionId(*entry.getSessionId()); - opCtx->setTxnNumber(*entry.getTxnNumber()); - opCtx->setInMultiDocumentTransaction(); - - // The write on transaction table may be applied concurrently, so refreshing state - // from disk may read that write, causing starting a new transaction on an existing - // txnNumber. Thus, we start a new transaction without refreshing state from disk. - MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx); - - auto transaction = TransactionParticipant::get(opCtx); - invariant(transaction); - transaction.unstashTransactionResources(opCtx, "commitTransaction"); - transaction.commitPreparedTransaction( - opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime); - return Status::OK(); + MONGO_UNREACHABLE; } Status applyAbortTransaction(OperationContext* opCtx, const OplogEntry& entry, repl::OplogApplication::Mode mode) { - // Return error if run via applyOps command. - uassert(50972, - "abortTransaction is only used internally by secondaries.", - mode != repl::OplogApplication::Mode::kApplyOpsCmd); - - // We don't put transactions into the prepare state until the end of recovery and initial sync, - // so there is no transaction to abort. - if (mode == repl::OplogApplication::Mode::kRecovering || - mode == repl::OplogApplication::Mode::kInitialSync) { - return Status::OK(); + switch (mode) { + case repl::OplogApplication::Mode::kRecovering: { + // We don't put transactions into the prepare state until the end of recovery, + // so there is no transaction to abort. + return Status::OK(); + } + case repl::OplogApplication::Mode::kInitialSync: { + // We don't put transactions into the prepare state until the end of initial sync, + // so there is no transaction to abort. + return Status::OK(); + } + case repl::OplogApplication::Mode::kApplyOpsCmd: { + // Return error if run via applyOps command. + uasserted(50972, "abortTransaction is only used internally by secondaries."); + } + case repl::OplogApplication::Mode::kSecondary: { + // Transaction operations are in its own batch, so we can modify their opCtx. + invariant(entry.getSessionId()); + invariant(entry.getTxnNumber()); + opCtx->setLogicalSessionId(*entry.getSessionId()); + opCtx->setTxnNumber(*entry.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); + + // The write on transaction table may be applied concurrently, so refreshing state + // from disk may read that write, causing starting a new transaction on an existing + // txnNumber. Thus, we start a new transaction without refreshing state from disk. + MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx); + + auto transaction = TransactionParticipant::get(opCtx); + transaction.unstashTransactionResources(opCtx, "abortTransaction"); + transaction.abortTransaction(opCtx); + return Status::OK(); + } } - - invariant(mode == repl::OplogApplication::Mode::kSecondary); - - // Transaction operations are in its own batch, so we can modify their opCtx. - invariant(entry.getSessionId()); - invariant(entry.getTxnNumber()); - opCtx->setLogicalSessionId(*entry.getSessionId()); - opCtx->setTxnNumber(*entry.getTxnNumber()); - opCtx->setInMultiDocumentTransaction(); - - // The write on transaction table may be applied concurrently, so refreshing state - // from disk may read that write, causing starting a new transaction on an existing - // txnNumber. Thus, we start a new transaction without refreshing state from disk. - MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx); - - auto transaction = TransactionParticipant::get(opCtx); - transaction.unstashTransactionResources(opCtx, "abortTransaction"); - transaction.abortTransaction(opCtx); - return Status::OK(); + MONGO_UNREACHABLE; } repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( OperationContext* opCtx, - const OplogEntry& commitOrPrepare, - const std::vector<OplogEntry*>& cachedOps, - boost::optional<Timestamp> commitOplogEntryTS) { - repl::MultiApplier::Operations ops; + const OplogEntry& lastEntryInTxn, + const std::vector<OplogEntry*>& cachedOps) noexcept { + // Traverse the oplog chain with its own snapshot and read timestamp. + ReadSourceScope readSourceScope(opCtx); - // Get the previous oplog entry. - auto currentOpTime = commitOrPrepare.getOpTime(); + repl::MultiApplier::Operations ops; // The cachedOps are the ops for this transaction that are from the same oplog application batch // as the commit or prepare, those which have not necessarily been written to the oplog. These // ops are in order of increasing timestamp. + const auto oldestEntryInBatch = cachedOps.empty() ? lastEntryInTxn : *cachedOps.front(); - // The lastEntryOpTime is the OpTime of the last (latest OpTime) entry for this transaction + // The lastEntryWrittenToOplogOpTime is the OpTime of the latest entry for this transaction // which is expected to be present in the oplog. It is the entry before the first cachedOp, // unless there are no cachedOps in which case it is the entry before the commit or prepare. - const auto lastEntryOpTime = (cachedOps.empty() ? commitOrPrepare : *cachedOps.front()) - .getPrevWriteOpTimeInTransaction(); - invariant(lastEntryOpTime < currentOpTime); + const auto lastEntryWrittenToOplogOpTime = oldestEntryInBatch.getPrevWriteOpTimeInTransaction(); + invariant(lastEntryWrittenToOplogOpTime < lastEntryInTxn.getOpTime()); + + TransactionHistoryIterator iter(lastEntryWrittenToOplogOpTime.get()); + + // If we started with a prepared commit, we want to forget about that operation and move onto + // the prepare. + auto prepareOrUnpreparedCommit = lastEntryInTxn; + if (lastEntryInTxn.isPreparedCommit()) { + // A prepared-commit must be in its own batch and thus have no cached ops. + invariant(cachedOps.empty()); + invariant(iter.hasNext()); + prepareOrUnpreparedCommit = iter.nextFatalOnErrors(opCtx); + } + invariant(prepareOrUnpreparedCommit.getCommandType() == OplogEntry::CommandType::kApplyOps); - TransactionHistoryIterator iter(lastEntryOpTime.get()); - // Empty commits are not allowed, but empty prepares are. - invariant(commitOrPrepare.getCommandType() != OplogEntry::CommandType::kCommitTransaction || - !cachedOps.empty() || iter.hasNext()); - auto commitOrPrepareObj = commitOrPrepare.toBSON(); + // The non-DurableReplOperation fields of the extracted transaction operations will match those + // of the lastEntryInTxn. For a prepared commit, this will include the commit oplog entry's + // 'ts' field, which is what we want. + auto lastEntryInTxnObj = lastEntryInTxn.toBSON(); // First retrieve and transform the ops from the oplog, which will be retrieved in reverse // order. @@ -259,8 +272,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( const auto& operationEntry = iter.nextFatalOnErrors(opCtx); invariant(operationEntry.isPartialTransaction()); auto prevOpsEnd = ops.size(); - repl::ApplyOps::extractOperationsTo( - operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS); + repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops); + // Because BSONArrays do not have fast way of determining size without iterating through // them, and we also have no way of knowing how many oplog entries are in a transaction // without iterating, reversing each applyOps and then reversing the whole array is @@ -275,15 +288,11 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( for (auto* cachedOp : cachedOps) { const auto& operationEntry = *cachedOp; invariant(operationEntry.isPartialTransaction()); - repl::ApplyOps::extractOperationsTo( - operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS); + repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops); } - // Reconstruct the operations from the commit or prepare oplog entry. - if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) { - repl::ApplyOps::extractOperationsTo( - commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS); - } + // Reconstruct the operations from the prepare or unprepared commit oplog entry. + repl::ApplyOps::extractOperationsTo(prepareOrUnpreparedCommit, lastEntryInTxnObj, &ops); return ops; } @@ -294,18 +303,15 @@ namespace { */ Status _applyPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry, - repl::OplogApplication::Mode oplogApplicationMode) { + repl::OplogApplication::Mode mode) { // The operations here are reconstructed at their prepare time. However, that time will // be ignored because there is an outer write unit of work during their application. // The prepare time of the transaction is set explicitly below. - auto ops = [&] { - ReadSourceScope readSourceScope(opCtx); - return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none); - }(); + auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {}); - if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering || - oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) { + if (mode == repl::OplogApplication::Mode::kRecovering || + mode == repl::OplogApplication::Mode::kInitialSync) { // We might replay a prepared transaction behind oldest timestamp. Note that since this is // scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly // abandons the storage transaction when it releases the global lock, this must be done @@ -346,11 +352,11 @@ Status _applyPrepareTransaction(OperationContext* opCtx, // Set this in case the application of any ops need to use the prepare timestamp of this // transaction. It should be cleared automatically when the transaction finishes. - if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) { + if (mode == repl::OplogApplication::Mode::kRecovering) { transaction.setPrepareOpTimeForRecovery(opCtx, entry.getOpTime()); } - auto status = _applyOperationsForTransaction(opCtx, ops, oplogApplicationMode); + auto status = _applyOperationsForTransaction(opCtx, ops, mode); fassert(31137, status); if (MONGO_FAIL_POINT(applyOpsHangBeforePreparingTransaction)) { @@ -366,54 +372,70 @@ Status _applyPrepareTransaction(OperationContext* opCtx, } /** - * Apply a prepared transaction during recovery. + * Apply a prepared transaction when we are reconstructing prepared transactions. */ -Status applyRecoveredPrepareTransaction(OperationContext* opCtx, - const OplogEntry& entry, - repl::OplogApplication::Mode mode) { - // Snapshot transactions never conflict with the PBWM lock. - invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); +void _reconstructPreparedTransaction(OperationContext* opCtx, + const OplogEntry& prepareEntry, + repl::OplogApplication::Mode mode) { + repl::UnreplicatedWritesBlock uwb(opCtx); + + // Snapshot transaction can never conflict with the PBWM lock. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + // When querying indexes, we return the record matching the key if it exists, or an + // adjacent document. This means that it is possible for us to hit a prepare conflict if + // we query for an incomplete key and an adjacent key is prepared. + // We ignore prepare conflicts on recovering nodes because they may encounter prepare + // conflicts that did not occur on the primary. + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflictsAllowWrites); + // We might replay a prepared transaction behind oldest timestamp. opCtx->recoveryUnit()->setRoundUpPreparedTimestamps(true); - return _applyPrepareTransaction(opCtx, entry, mode); + + // Checks out the session, applies the operations and prepares the transaction. + uassertStatusOK(_applyPrepareTransaction(opCtx, prepareEntry, mode)); } } // namespace /** - * Make sure that if we are in replication recovery or initial sync, we don't apply the prepare - * transaction oplog entry until we either see a commit transaction oplog entry or are at the very - * end of recovery/initial sync. Otherwise, only apply the prepare transaction oplog entry if we are - * a secondary. + * Make sure that if we are in replication recovery, we don't apply the prepare transaction oplog + * entry until we either see a commit transaction oplog entry or are at the very end of recovery. + * Otherwise, only apply the prepare transaction oplog entry if we are a secondary. We shouldn't get + * here for initial sync and applyOps should error. */ Status applyPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry, - repl::OplogApplication::Mode oplogApplicationMode) { - // Don't apply the operations from the prepared transaction until either we see a commit - // transaction oplog entry during recovery or are at the end of recovery. - if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) { - if (!serverGlobalParams.enableMajorityReadConcern) { - error() << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is " + repl::OplogApplication::Mode mode) { + switch (mode) { + case repl::OplogApplication::Mode::kRecovering: { + if (!serverGlobalParams.enableMajorityReadConcern) { + error() + << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is " "set to false. Restart the server with --enableMajorityReadConcern=true " "to complete recovery."; - } - fassert(51146, serverGlobalParams.enableMajorityReadConcern); - return Status::OK(); - } + fassertFailed(51146); + } - // Don't apply the operations from the prepared transaction until either we see a commit - // transaction oplog entry during the oplog application phase of initial sync or are at the end - // of initial sync. - if (oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) { - return Status::OK(); + // Don't apply the operations from the prepared transaction until either we see a commit + // transaction oplog entry during recovery or are at the end of recovery. + return Status::OK(); + } + case repl::OplogApplication::Mode::kInitialSync: { + // Initial sync should never apply 'prepareTransaction' since it unpacks committed + // transactions onto various applier threads at commit time. + MONGO_UNREACHABLE; + } + case repl::OplogApplication::Mode::kApplyOpsCmd: { + // Return error if run via applyOps command. + uasserted(51145, + "prepare applyOps oplog entry is only used internally by secondaries."); + } + case repl::OplogApplication::Mode::kSecondary: { + return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kSecondary); + } } - - // Return error if run via applyOps command. - uassert(51145, - "prepare applyOps oplog entry is only used internally by secondaries.", - oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd); - - invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary); - return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode); + MONGO_UNREACHABLE; } void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) { @@ -454,22 +476,7 @@ void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplica opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions"); AlternativeClientRegion acr(newClient); const auto newOpCtx = cc().makeOperationContext(); - repl::UnreplicatedWritesBlock uwb(newOpCtx.get()); - - // Snapshot transaction can never conflict with the PBWM lock. - newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - - // When querying indexes, we return the record matching the key if it exists, or an - // adjacent document. This means that it is possible for us to hit a prepare conflict if - // we query for an incomplete key and an adjacent key is prepared. - // We ignore prepare conflicts on recovering nodes because they may encounter prepare - // conflicts that did not occur on the primary. - newOpCtx->recoveryUnit()->setPrepareConflictBehavior( - PrepareConflictBehavior::kIgnoreConflictsAllowWrites); - - // Checks out the session, applies the operations and prepares the transaction. - uassertStatusOK( - applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode)); + _reconstructPreparedTransaction(newOpCtx.get(), prepareOplogEntry, mode); } } } diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h index 5ad20f48875..ad85ccca6fa 100644 --- a/src/mongo/db/repl/transaction_oplog_application.h +++ b/src/mongo/db/repl/transaction_oplog_application.h @@ -63,8 +63,7 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx, repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( OperationContext* opCtx, const repl::OplogEntry& entry, - const std::vector<repl::OplogEntry*>& cachedOps, - boost::optional<Timestamp> commitOplogEntryTS); + const std::vector<repl::OplogEntry*>& cachedOps) noexcept; /** * Apply `prepareTransaction` oplog entry. |