diff options
author | Samy Lanka <samy.lanka@mongodb.com> | 2019-04-30 21:59:26 -0400 |
---|---|---|
committer | Samy Lanka <samy.lanka@mongodb.com> | 2019-05-06 17:23:18 -0400 |
commit | 8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf (patch) | |
tree | f12beec2e31fc5a189bbf1943682f2b05a77972d /src/mongo/db/repl/transaction_oplog_application.cpp | |
parent | a25226e009fa1598f3077dd7972b9be3d2368785 (diff) | |
download | mongo-8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf.tar.gz |
SERVER-36492 Reconstruct prepared transactions at the end of initial sync
Diffstat (limited to 'src/mongo/db/repl/transaction_oplog_application.cpp')
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 94 |
1 files changed, 79 insertions, 15 deletions
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 3af7319fefb..35394e783db 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -37,8 +37,10 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/timestamp_block.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_history_iterator.h" @@ -51,6 +53,9 @@ namespace { // If enabled, causes _applyPrepareTransaction to hang before preparing the transaction participant. MONGO_FAIL_POINT_DEFINE(applyPrepareCommandHangBeforePreparingTransaction); +// Failpoint that will cause reconstructPreparedTransactions to return early. +MONGO_FAIL_POINT_DEFINE(skipReconstructPreparedTransactions); + // Apply the oplog entries for a prepare or a prepared commit during recovery/initial sync. Status _applyOperationsForTransaction(OperationContext* opCtx, @@ -176,6 +181,7 @@ Status applyCommitTransaction(OperationContext* opCtx, invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); opCtx->setTxnNumber(*entry.getTxnNumber()); + // The write on transaction table may be applied concurrently, so refreshing state // from disk may read that write, causing starting a new transaction on an existing // txnNumber. Thus, we start a new transaction without refreshing state from disk. @@ -197,13 +203,13 @@ Status applyAbortTransaction(OperationContext* opCtx, "abortTransaction is only used internally by secondaries.", mode != repl::OplogApplication::Mode::kApplyOpsCmd); - // We don't put transactions into the prepare state until the end of recovery, so there is - // no transaction to abort. - if (mode == repl::OplogApplication::Mode::kRecovering) { + // We don't put transactions into the prepare state until the end of recovery and initial sync, + // so there is no transaction to abort. + if (mode == repl::OplogApplication::Mode::kRecovering || + mode == repl::OplogApplication::Mode::kInitialSync) { return Status::OK(); } - // TODO: SERVER-36492 Only run on secondary until we support initial sync. invariant(mode == repl::OplogApplication::Mode::kSecondary); // Transaction operations are in its own batch, so we can modify their opCtx. @@ -294,8 +300,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( namespace { /** - * This is the part of applyPrepareTransaction which is common to steady state and recovery - * oplog application. + * This is the part of applyPrepareTransaction which is common to steady state, initial sync and + * recovery oplog application. */ Status _applyPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry, @@ -309,7 +315,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx, return readTransactionOperationsFromOplogChain(opCtx, entry, {}); }(); - if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) { + if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering || + oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) { // We might replay a prepared transaction behind oldest timestamp. Note that since this is // scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly // abandons the storage transaction when it releases the global lock, this must be done @@ -357,6 +364,23 @@ Status _applyPrepareTransaction(OperationContext* opCtx, return Status::OK(); } + +/** + * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with + * 'prepare' set or a prepareTransaction command. + */ +Status applyRecoveredPrepareTransaction(OperationContext* opCtx, + const OplogEntry& entry, + repl::OplogApplication::Mode mode) { + // Snapshot transactions never conflict with the PBWM lock. + invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); + if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) { + return _applyPrepareTransaction(opCtx, entry, mode); + } else { + // This is an applyOps with prepare. + return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry, mode); + } +} } // namespace /** @@ -396,14 +420,54 @@ Status applyPrepareTransaction(OperationContext* opCtx, return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode); } -Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry) { - // Snapshot transactions never conflict with the PBWM lock. - invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); - if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) { - return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kRecovering); - } else { - // This is an applyOps with prepare. - return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry); +void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) { + if (MONGO_FAIL_POINT(skipReconstructPreparedTransactions)) { + log() << "Hit skipReconstructPreparedTransactions failpoint"; + return; + } + // Read the transactions table with its own snapshot and read timestamp. + ReadSourceScope readSourceScope(opCtx); + + DBDirectClient client(opCtx); + const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, + {BSON("state" + << "prepared")}); + + // Iterate over each entry in the transactions table that has a prepared transaction. + while (cursor->more()) { + const auto txnRecordObj = cursor->next(); + const auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("recovering prepared transaction"), txnRecordObj); + + invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared); + + // Get the prepareTransaction oplog entry corresponding to this transactions table entry. + const auto prepareOpTime = txnRecord.getLastWriteOpTime(); + invariant(!prepareOpTime.isNull()); + TransactionHistoryIterator iter(prepareOpTime); + invariant(iter.hasNext()); + auto prepareOplogEntry = iter.next(opCtx); + + { + // Make a new opCtx so that we can set the lsid when applying the prepare transaction + // oplog entry. + auto newClient = + opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions"); + AlternativeClientRegion acr(newClient); + const auto newOpCtx = cc().makeOperationContext(); + repl::UnreplicatedWritesBlock uwb(newOpCtx.get()); + + // Snapshot transaction can never conflict with the PBWM lock. + newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on + // recovering nodes cannot encounter unnecessary prepare conflicts. + newOpCtx->recoveryUnit()->setIgnorePrepared(true); + + // Checks out the session, applies the operations and prepares the transaction. + uassertStatusOK( + applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode)); + } } } |