summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/transaction_oplog_application.cpp
diff options
context:
space:
mode:
authorSamy Lanka <samy.lanka@mongodb.com>2019-04-30 21:59:26 -0400
committerSamy Lanka <samy.lanka@mongodb.com>2019-05-06 17:23:18 -0400
commit8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf (patch)
treef12beec2e31fc5a189bbf1943682f2b05a77972d /src/mongo/db/repl/transaction_oplog_application.cpp
parenta25226e009fa1598f3077dd7972b9be3d2368785 (diff)
downloadmongo-8ad1effc48ac5193c5f57630d1fbce8bda0cfdaf.tar.gz
SERVER-36492 Reconstruct prepared transactions at the end of initial sync
Diffstat (limited to 'src/mongo/db/repl/transaction_oplog_application.cpp')
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp94
1 files changed, 79 insertions, 15 deletions
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 3af7319fefb..35394e783db 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -37,8 +37,10 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/timestamp_block.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -51,6 +53,9 @@ namespace {
// If enabled, causes _applyPrepareTransaction to hang before preparing the transaction participant.
MONGO_FAIL_POINT_DEFINE(applyPrepareCommandHangBeforePreparingTransaction);
+// Failpoint that will cause reconstructPreparedTransactions to return early.
+MONGO_FAIL_POINT_DEFINE(skipReconstructPreparedTransactions);
+
// Apply the oplog entries for a prepare or a prepared commit during recovery/initial sync.
Status _applyOperationsForTransaction(OperationContext* opCtx,
@@ -176,6 +181,7 @@ Status applyCommitTransaction(OperationContext* opCtx,
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
opCtx->setTxnNumber(*entry.getTxnNumber());
+
// The write on transaction table may be applied concurrently, so refreshing state
// from disk may read that write, causing starting a new transaction on an existing
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
@@ -197,13 +203,13 @@ Status applyAbortTransaction(OperationContext* opCtx,
"abortTransaction is only used internally by secondaries.",
mode != repl::OplogApplication::Mode::kApplyOpsCmd);
- // We don't put transactions into the prepare state until the end of recovery, so there is
- // no transaction to abort.
- if (mode == repl::OplogApplication::Mode::kRecovering) {
+ // We don't put transactions into the prepare state until the end of recovery and initial sync,
+ // so there is no transaction to abort.
+ if (mode == repl::OplogApplication::Mode::kRecovering ||
+ mode == repl::OplogApplication::Mode::kInitialSync) {
return Status::OK();
}
- // TODO: SERVER-36492 Only run on secondary until we support initial sync.
invariant(mode == repl::OplogApplication::Mode::kSecondary);
// Transaction operations are in its own batch, so we can modify their opCtx.
@@ -294,8 +300,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
namespace {
/**
- * This is the part of applyPrepareTransaction which is common to steady state and recovery
- * oplog application.
+ * This is the part of applyPrepareTransaction which is common to steady state, initial sync and
+ * recovery oplog application.
*/
Status _applyPrepareTransaction(OperationContext* opCtx,
const OplogEntry& entry,
@@ -309,7 +315,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
return readTransactionOperationsFromOplogChain(opCtx, entry, {});
}();
- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) {
+ if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
+ oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
// We might replay a prepared transaction behind oldest timestamp. Note that since this is
// scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly
// abandons the storage transaction when it releases the global lock, this must be done
@@ -357,6 +364,23 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
return Status::OK();
}
+
+/**
+ * Apply a prepared transaction during recovery. The OplogEntry must be an 'applyOps' with
+ * 'prepare' set or a prepareTransaction command.
+ */
+Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
+ const OplogEntry& entry,
+ repl::OplogApplication::Mode mode) {
+ // Snapshot transactions never conflict with the PBWM lock.
+ invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
+ if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) {
+ return _applyPrepareTransaction(opCtx, entry, mode);
+ } else {
+ // This is an applyOps with prepare.
+ return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry, mode);
+ }
+}
} // namespace
/**
@@ -396,14 +420,54 @@ Status applyPrepareTransaction(OperationContext* opCtx,
return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
}
-Status applyRecoveredPrepareTransaction(OperationContext* opCtx, const OplogEntry& entry) {
- // Snapshot transactions never conflict with the PBWM lock.
- invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
- if (entry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction) {
- return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kRecovering);
- } else {
- // This is an applyOps with prepare.
- return applyRecoveredPrepareApplyOpsOplogEntry(opCtx, entry);
+void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) {
+ if (MONGO_FAIL_POINT(skipReconstructPreparedTransactions)) {
+ log() << "Hit skipReconstructPreparedTransactions failpoint";
+ return;
+ }
+ // Read the transactions table with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
+
+ DBDirectClient client(opCtx);
+ const auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ {BSON("state"
+ << "prepared")});
+
+ // Iterate over each entry in the transactions table that has a prepared transaction.
+ while (cursor->more()) {
+ const auto txnRecordObj = cursor->next();
+ const auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("recovering prepared transaction"), txnRecordObj);
+
+ invariant(txnRecord.getState() == DurableTxnStateEnum::kPrepared);
+
+ // Get the prepareTransaction oplog entry corresponding to this transactions table entry.
+ const auto prepareOpTime = txnRecord.getLastWriteOpTime();
+ invariant(!prepareOpTime.isNull());
+ TransactionHistoryIterator iter(prepareOpTime);
+ invariant(iter.hasNext());
+ auto prepareOplogEntry = iter.next(opCtx);
+
+ {
+ // Make a new opCtx so that we can set the lsid when applying the prepare transaction
+ // oplog entry.
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions");
+ AlternativeClientRegion acr(newClient);
+ const auto newOpCtx = cc().makeOperationContext();
+ repl::UnreplicatedWritesBlock uwb(newOpCtx.get());
+
+ // Snapshot transaction can never conflict with the PBWM lock.
+ newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+
+ // TODO: SERVER-40177 This should be removed once it is guaranteed operations applied on
+ // recovering nodes cannot encounter unnecessary prepare conflicts.
+ newOpCtx->recoveryUnit()->setIgnorePrepared(true);
+
+ // Checks out the session, applies the operations and prepares the transaction.
+ uassertStatusOK(
+ applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode));
+ }
}
}