summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah.schvimer@10gen.com>2019-10-11 17:57:43 +0000
committerevergreen <evergreen@mongodb.com>2019-10-11 17:57:43 +0000
commit20883d237ab0e4ee45c3aa6bc5f00602265402b1 (patch)
treed08d08979c909e7e45b2dd9bbba3483ff1652b63
parent795f9bc3047184cc27f1643fa7c06bf2386f6218 (diff)
downloadmongo-20883d237ab0e4ee45c3aa6bc5f00602265402b1.tar.gz
SERVER-41437 minor transaction_oplog_application.js clean up
SERVER-41437 unify transaction oplog traversal (cherry picked from commit 6dabe5871fcd280f654e475e7048385b54b1ea64) (cherry picked from commit f14d9c4c07e69c2109de0af059123060c73cdd77)
-rw-r--r--src/mongo/db/repl/apply_ops.cpp13
-rw-r--r--src/mongo/db/repl/apply_ops.h10
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/oplog_entry.h17
-rw-r--r--src/mongo/db/repl/sync_tail.cpp99
-rw-r--r--src/mongo/db/repl/sync_tail.h4
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp321
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h3
8 files changed, 232 insertions, 248 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 4f073418500..675573b8a38 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -494,14 +494,13 @@ Status applyOps(OperationContext* opCtx,
// static
MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) {
MultiApplier::Operations result;
- extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none);
+ extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result);
return result;
}
void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations,
- boost::optional<Timestamp> commitOplogEntryTS) {
+ MultiApplier::Operations* operations) {
uassert(ErrorCodes::TypeMismatch,
str::stream() << "ApplyOps::extractOperations(): not a command: "
<< redact(applyOpsOplogEntry.toBSON()),
@@ -521,14 +520,8 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
for (const auto& elem : operationDocs) {
auto operationDoc = elem.Obj();
- BSONObjBuilder builder(operationDoc);
-
- // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique
- // will not overwrite this value.
- if (commitOplogEntryTS) {
- builder.append("ts", *commitOplogEntryTS);
- }
+ BSONObjBuilder builder(operationDoc);
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 8aac61a39b9..7489c94a486 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -55,16 +55,12 @@ public:
/**
* This variant allows optimization for extracting multiple applyOps operations. The entry for
* the non-DurableReplOperation fields of the extracted operation must be specified as
- * 'topLevelDoc', and need not be any of the applyOps operations.
- *
- * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations
- * from a prepare oplog entry during initial sync. These operations must be timestamped at the
- * commit oplog entry timestamp instead of the prepareTimestamp.
+ * 'topLevelDoc', and need not be any of the applyOps operations. The 'topLevelDoc' entry's
+ * 'ts' field will be used as the 'ts' field for each operation.
*/
static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations,
- boost::optional<Timestamp> commitOplogEntryTS);
+ MultiApplier::Operations* operations);
};
/**
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index f6c26167f4a..b6cf0d36764 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -388,8 +388,17 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
SyncTailTest::makeInitialSyncOptions());
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
- // Derive ops for transactions if necessary.
- syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+
+ // Keeps all operations in scope for the lifetime of this function.
+ std::vector<MultiApplier::Operations> singleOpVectors;
+ for (auto&& entry : ops) {
+ // Derive ops for transactions if necessary.
+ std::vector<OplogEntry> op;
+ op.push_back(entry);
+ singleOpVectors.emplace_back(op);
+ syncTail.fillWriterVectors(
+ _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps);
+ }
const auto& opPtrs = writerVectors[0];
ASSERT_OK(runOpPtrsInitialSync(opPtrs));
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 0e222aa1bc8..a672e73f333 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -201,6 +201,23 @@ public:
}
/**
+ * Returns if this is a prepared 'commitTransaction' oplog entry.
+ */
+ bool isPreparedCommit() const {
+ return getCommandType() == OplogEntry::CommandType::kCommitTransaction;
+ }
+
+ /**
+ * Returns whether the oplog entry represents an applyOps which is a self-contained atomic
+ * operation, or the last applyOps of an unprepared transaction, as opposed to part of a
+ * prepared transaction or a non-final applyOps in a transaction.
+ */
+ bool isTerminalApplyOps() const {
+ return getCommandType() == OplogEntry::CommandType::kApplyOps && !shouldPrepare() &&
+ !isPartialTransaction() && !getObject().getBoolField("prepare");
+ }
+
+ /**
* Returns if the oplog entry is for a CRUD operation.
*/
static bool isCrudOpType(OpTypeEnum opType);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 7efe5ee10b4..6938895799c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -869,20 +869,6 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
}
}
-// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation,
-// or the last applyOps of an unprepared transaction, as opposed to part of a prepared transaction
-// or a non-final applyOps in an transaction.
-inline bool isCommitApplyOps(const OplogEntry& entry) {
- return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare() &&
- !entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare");
-}
-
-// Returns whether a commitTransaction oplog entry is a part of a prepared transaction.
-inline bool isPreparedCommit(const OplogEntry& entry) {
- return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction;
-}
-
-
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
@@ -1147,7 +1133,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
+ SessionUpdateTracker* sessionUpdateTracker) noexcept {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1182,7 +1168,9 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
// We must save it here because we are not guaranteed it has been written to the oplog
// yet.
- if (op.isPartialTransaction()) {
+ // We also do this for prepare during initial sync.
+ if (op.isPartialTransaction() ||
+ (op.shouldPrepare() && _options.mode == OplogApplication::Mode::kInitialSync)) {
auto& partialTxnList = partialTxnOps[*op.getSessionId()];
// If this operation belongs to an existing partial transaction, partialTxnList
// must contain the previous operations of the transaction.
@@ -1222,43 +1210,29 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Extract applyOps operations and fill writers with extracted operations using this
// function.
- if (isCommitApplyOps(op)) {
- try {
- auto logicalSessionId = op.getSessionId();
- // applyOps entries generated by a transaction must have a sessionId and a
- // transaction number.
- if (logicalSessionId && op.getTxnNumber()) {
- // On commit of unprepared transactions, get transactional operations from the
- // oplog and fill writers with those operations.
- // Flush partialTxnList operations for current transaction.
- auto& partialTxnList = partialTxnOps[*logicalSessionId];
- {
- // We need to use a ReadSourceScope avoid the reads of the transaction
- // messing up the state of the opCtx. In particular we do not want to
- // set the ReadSource to kLastApplied.
- ReadSourceScope readSourceScope(opCtx);
- derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
- opCtx, op, partialTxnList, boost::none));
- partialTxnList.clear();
- }
- // Transaction entries cannot have different session updates.
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- } else {
- // The applyOps entry was not generated as part of a transaction.
- invariant(!op.getPrevWriteOpTimeInTransaction());
- derivedOps->emplace_back(ApplyOps::extractOperations(op));
+ if (op.isTerminalApplyOps()) {
+ auto logicalSessionId = op.getSessionId();
+ // applyOps entries generated by a transaction must have a sessionId and a
+ // transaction number.
+ if (logicalSessionId && op.getTxnNumber()) {
+ // On commit of unprepared transactions, get transactional operations from the
+ // oplog and fill writers with those operations.
+ // Flush partialTxnList operations for current transaction.
+ auto& partialTxnList = partialTxnOps[*logicalSessionId];
+
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ partialTxnList.clear();
- // Nested entries cannot have different session updates.
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
- } catch (...) {
- fassertFailedWithStatusNoTrace(
- 50711,
- exceptionToStatus().withContext(str::stream()
- << "Unable to extract operations from applyOps "
- << redact(op.toBSON())));
+ // Transaction entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ } else {
+ // The applyOps entry was not generated as part of a transaction.
+ invariant(!op.getPrevWriteOpTimeInTransaction());
+ derivedOps->emplace_back(ApplyOps::extractOperations(op));
+
+ // Nested entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
continue;
}
@@ -1266,24 +1240,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// If we see a commitTransaction command that is a part of a prepared transaction during
// initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
// with the extracted operations.
- if (isPreparedCommit(op) && (_options.mode == OplogApplication::Mode::kInitialSync)) {
+ if (op.isPreparedCommit() && (_options.mode == OplogApplication::Mode::kInitialSync)) {
auto logicalSessionId = op.getSessionId();
auto& partialTxnList = partialTxnOps[*logicalSessionId];
- {
- // Traverse the oplog chain with its own snapshot and read timestamp.
- ReadSourceScope readSourceScope(opCtx);
-
- // Get the previous oplog entry, which should be a prepare oplog entry.
- const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op);
- invariant(prevOplogEntry.shouldPrepare());
-
- // Extract the operations from the applyOps entry.
- auto commitOplogEntryOpTime = op.getOpTime();
- derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
- opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
- partialTxnList.clear();
- }
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ partialTxnList.clear();
_fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
continue;
@@ -1300,7 +1263,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+ std::vector<MultiApplier::Operations>* derivedOps) noexcept {
SessionUpdateTracker sessionUpdateTracker;
_fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index db036ac771a..c4923544c29 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -241,7 +241,7 @@ public:
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps);
+ std::vector<MultiApplier::Operations>* derivedOps) noexcept;
private:
class OpQueueBatcher;
@@ -252,7 +252,7 @@ private:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker);
+ SessionUpdateTracker* sessionUpdateTracker) noexcept;
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 368e8c2200a..eb2a44af338 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -83,26 +83,17 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
* Helper that will read the entire sequence of oplog entries for the transaction and apply each of
* them.
*
- * Currently used for oplog application of a commitTransaction oplog entry during recovery, rollback
- * and initial sync.
+ * Currently used for oplog application of a commitTransaction oplog entry during recovery and
+ * rollback.
*/
Status _applyTransactionFromOplogChain(OperationContext* opCtx,
const OplogEntry& entry,
repl::OplogApplication::Mode mode,
Timestamp commitTimestamp,
Timestamp durableTimestamp) {
- invariant(mode == repl::OplogApplication::Mode::kRecovering ||
- mode == repl::OplogApplication::Mode::kInitialSync);
+ invariant(mode == repl::OplogApplication::Mode::kRecovering);
- repl::MultiApplier::Operations ops;
- {
- // Traverse the oplog chain with its own snapshot and read timestamp.
- ReadSourceScope readSourceScope(opCtx);
-
- // Get the corresponding prepare applyOps oplog entry.
- const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry);
- ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none);
- }
+ auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {});
const auto dbName = entry.getNss().db().toString();
Status status = Status::OK();
@@ -151,107 +142,129 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
Status applyCommitTransaction(OperationContext* opCtx,
const OplogEntry& entry,
repl::OplogApplication::Mode mode) {
- // Return error if run via applyOps command.
- uassert(50987,
- "commitTransaction is only used internally by secondaries.",
- mode != repl::OplogApplication::Mode::kApplyOpsCmd);
-
IDLParserErrorContext ctx("commitTransaction");
auto commitOplogEntryOpTime = entry.getOpTime();
auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
invariant(commitCommand.getCommitTimestamp());
- if (mode == repl::OplogApplication::Mode::kRecovering) {
- return _applyTransactionFromOplogChain(opCtx,
- entry,
- mode,
- *commitCommand.getCommitTimestamp(),
- commitOplogEntryOpTime.getTimestamp());
+ switch (mode) {
+ case repl::OplogApplication::Mode::kRecovering: {
+ return _applyTransactionFromOplogChain(opCtx,
+ entry,
+ mode,
+ *commitCommand.getCommitTimestamp(),
+ commitOplogEntryOpTime.getTimestamp());
+ }
+ case repl::OplogApplication::Mode::kInitialSync: {
+ // Initial sync should never apply 'commitTransaction' since it unpacks committed
+ // transactions onto various applier threads.
+ MONGO_UNREACHABLE;
+ }
+ case repl::OplogApplication::Mode::kApplyOpsCmd: {
+ // Return error if run via applyOps command.
+ uasserted(50987, "commitTransaction is only used internally by secondaries.");
+ }
+ case repl::OplogApplication::Mode::kSecondary: {
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
+
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
+
+ auto transaction = TransactionParticipant::get(opCtx);
+ invariant(transaction);
+ transaction.unstashTransactionResources(opCtx, "commitTransaction");
+ transaction.commitPreparedTransaction(
+ opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime);
+ return Status::OK();
+ }
}
-
- invariant(mode == repl::OplogApplication::Mode::kSecondary);
-
- // Transaction operations are in its own batch, so we can modify their opCtx.
- invariant(entry.getSessionId());
- invariant(entry.getTxnNumber());
- opCtx->setLogicalSessionId(*entry.getSessionId());
- opCtx->setTxnNumber(*entry.getTxnNumber());
- opCtx->setInMultiDocumentTransaction();
-
- // The write on transaction table may be applied concurrently, so refreshing state
- // from disk may read that write, causing starting a new transaction on an existing
- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
- MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
-
- auto transaction = TransactionParticipant::get(opCtx);
- invariant(transaction);
- transaction.unstashTransactionResources(opCtx, "commitTransaction");
- transaction.commitPreparedTransaction(
- opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime);
- return Status::OK();
+ MONGO_UNREACHABLE;
}
Status applyAbortTransaction(OperationContext* opCtx,
const OplogEntry& entry,
repl::OplogApplication::Mode mode) {
- // Return error if run via applyOps command.
- uassert(50972,
- "abortTransaction is only used internally by secondaries.",
- mode != repl::OplogApplication::Mode::kApplyOpsCmd);
-
- // We don't put transactions into the prepare state until the end of recovery and initial sync,
- // so there is no transaction to abort.
- if (mode == repl::OplogApplication::Mode::kRecovering ||
- mode == repl::OplogApplication::Mode::kInitialSync) {
- return Status::OK();
+ switch (mode) {
+ case repl::OplogApplication::Mode::kRecovering: {
+ // We don't put transactions into the prepare state until the end of recovery,
+ // so there is no transaction to abort.
+ return Status::OK();
+ }
+ case repl::OplogApplication::Mode::kInitialSync: {
+ // We don't put transactions into the prepare state until the end of initial sync,
+ // so there is no transaction to abort.
+ return Status::OK();
+ }
+ case repl::OplogApplication::Mode::kApplyOpsCmd: {
+ // Return error if run via applyOps command.
+ uasserted(50972, "abortTransaction is only used internally by secondaries.");
+ }
+ case repl::OplogApplication::Mode::kSecondary: {
+ // Transaction operations are in its own batch, so we can modify their opCtx.
+ invariant(entry.getSessionId());
+ invariant(entry.getTxnNumber());
+ opCtx->setLogicalSessionId(*entry.getSessionId());
+ opCtx->setTxnNumber(*entry.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
+
+ // The write on transaction table may be applied concurrently, so refreshing state
+ // from disk may read that write, causing starting a new transaction on an existing
+ // txnNumber. Thus, we start a new transaction without refreshing state from disk.
+ MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
+
+ auto transaction = TransactionParticipant::get(opCtx);
+ transaction.unstashTransactionResources(opCtx, "abortTransaction");
+ transaction.abortTransaction(opCtx);
+ return Status::OK();
+ }
}
-
- invariant(mode == repl::OplogApplication::Mode::kSecondary);
-
- // Transaction operations are in its own batch, so we can modify their opCtx.
- invariant(entry.getSessionId());
- invariant(entry.getTxnNumber());
- opCtx->setLogicalSessionId(*entry.getSessionId());
- opCtx->setTxnNumber(*entry.getTxnNumber());
- opCtx->setInMultiDocumentTransaction();
-
- // The write on transaction table may be applied concurrently, so refreshing state
- // from disk may read that write, causing starting a new transaction on an existing
- // txnNumber. Thus, we start a new transaction without refreshing state from disk.
- MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
-
- auto transaction = TransactionParticipant::get(opCtx);
- transaction.unstashTransactionResources(opCtx, "abortTransaction");
- transaction.abortTransaction(opCtx);
- return Status::OK();
+ MONGO_UNREACHABLE;
}
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
- const OplogEntry& commitOrPrepare,
- const std::vector<OplogEntry*>& cachedOps,
- boost::optional<Timestamp> commitOplogEntryTS) {
- repl::MultiApplier::Operations ops;
+ const OplogEntry& lastEntryInTxn,
+ const std::vector<OplogEntry*>& cachedOps) noexcept {
+ // Traverse the oplog chain with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
- // Get the previous oplog entry.
- auto currentOpTime = commitOrPrepare.getOpTime();
+ repl::MultiApplier::Operations ops;
// The cachedOps are the ops for this transaction that are from the same oplog application batch
// as the commit or prepare, those which have not necessarily been written to the oplog. These
// ops are in order of increasing timestamp.
+ const auto oldestEntryInBatch = cachedOps.empty() ? lastEntryInTxn : *cachedOps.front();
- // The lastEntryOpTime is the OpTime of the last (latest OpTime) entry for this transaction
+ // The lastEntryWrittenToOplogOpTime is the OpTime of the latest entry for this transaction
// which is expected to be present in the oplog. It is the entry before the first cachedOp,
// unless there are no cachedOps in which case it is the entry before the commit or prepare.
- const auto lastEntryOpTime = (cachedOps.empty() ? commitOrPrepare : *cachedOps.front())
- .getPrevWriteOpTimeInTransaction();
- invariant(lastEntryOpTime < currentOpTime);
+ const auto lastEntryWrittenToOplogOpTime = oldestEntryInBatch.getPrevWriteOpTimeInTransaction();
+ invariant(lastEntryWrittenToOplogOpTime < lastEntryInTxn.getOpTime());
+
+ TransactionHistoryIterator iter(lastEntryWrittenToOplogOpTime.get());
+
+ // If we started with a prepared commit, we want to forget about that operation and move onto
+ // the prepare.
+ auto prepareOrUnpreparedCommit = lastEntryInTxn;
+ if (lastEntryInTxn.isPreparedCommit()) {
+ // A prepared-commit must be in its own batch and thus have no cached ops.
+ invariant(cachedOps.empty());
+ invariant(iter.hasNext());
+ prepareOrUnpreparedCommit = iter.nextFatalOnErrors(opCtx);
+ }
+ invariant(prepareOrUnpreparedCommit.getCommandType() == OplogEntry::CommandType::kApplyOps);
- TransactionHistoryIterator iter(lastEntryOpTime.get());
- // Empty commits are not allowed, but empty prepares are.
- invariant(commitOrPrepare.getCommandType() != OplogEntry::CommandType::kCommitTransaction ||
- !cachedOps.empty() || iter.hasNext());
- auto commitOrPrepareObj = commitOrPrepare.toBSON();
+ // The non-DurableReplOperation fields of the extracted transaction operations will match those
+ // of the lastEntryInTxn. For a prepared commit, this will include the commit oplog entry's
+ // 'ts' field, which is what we want.
+ auto lastEntryInTxnObj = lastEntryInTxn.toBSON();
// First retrieve and transform the ops from the oplog, which will be retrieved in reverse
// order.
@@ -259,8 +272,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
const auto& operationEntry = iter.nextFatalOnErrors(opCtx);
invariant(operationEntry.isPartialTransaction());
auto prevOpsEnd = ops.size();
- repl::ApplyOps::extractOperationsTo(
- operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
+ repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops);
+
// Because BSONArrays do not have fast way of determining size without iterating through
// them, and we also have no way of knowing how many oplog entries are in a transaction
// without iterating, reversing each applyOps and then reversing the whole array is
@@ -275,15 +288,11 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
for (auto* cachedOp : cachedOps) {
const auto& operationEntry = *cachedOp;
invariant(operationEntry.isPartialTransaction());
- repl::ApplyOps::extractOperationsTo(
- operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
+ repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops);
}
- // Reconstruct the operations from the commit or prepare oplog entry.
- if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) {
- repl::ApplyOps::extractOperationsTo(
- commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS);
- }
+ // Reconstruct the operations from the prepare or unprepared commit oplog entry.
+ repl::ApplyOps::extractOperationsTo(prepareOrUnpreparedCommit, lastEntryInTxnObj, &ops);
return ops;
}
@@ -294,18 +303,15 @@ namespace {
*/
Status _applyPrepareTransaction(OperationContext* opCtx,
const OplogEntry& entry,
- repl::OplogApplication::Mode oplogApplicationMode) {
+ repl::OplogApplication::Mode mode) {
// The operations here are reconstructed at their prepare time. However, that time will
// be ignored because there is an outer write unit of work during their application.
// The prepare time of the transaction is set explicitly below.
- auto ops = [&] {
- ReadSourceScope readSourceScope(opCtx);
- return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none);
- }();
+ auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {});
- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
- oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
+ if (mode == repl::OplogApplication::Mode::kRecovering ||
+ mode == repl::OplogApplication::Mode::kInitialSync) {
// We might replay a prepared transaction behind oldest timestamp. Note that since this is
// scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly
// abandons the storage transaction when it releases the global lock, this must be done
@@ -346,11 +352,11 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
// Set this in case the application of any ops need to use the prepare timestamp of this
// transaction. It should be cleared automatically when the transaction finishes.
- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) {
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
transaction.setPrepareOpTimeForRecovery(opCtx, entry.getOpTime());
}
- auto status = _applyOperationsForTransaction(opCtx, ops, oplogApplicationMode);
+ auto status = _applyOperationsForTransaction(opCtx, ops, mode);
fassert(31137, status);
if (MONGO_FAIL_POINT(applyOpsHangBeforePreparingTransaction)) {
@@ -366,54 +372,70 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
}
/**
- * Apply a prepared transaction during recovery.
+ * Apply a prepared transaction when we are reconstructing prepared transactions.
*/
-Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
- const OplogEntry& entry,
- repl::OplogApplication::Mode mode) {
- // Snapshot transactions never conflict with the PBWM lock.
- invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
+void _reconstructPreparedTransaction(OperationContext* opCtx,
+ const OplogEntry& prepareEntry,
+ repl::OplogApplication::Mode mode) {
+ repl::UnreplicatedWritesBlock uwb(opCtx);
+
+ // Snapshot transaction can never conflict with the PBWM lock.
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+
+ // When querying indexes, we return the record matching the key if it exists, or an
+ // adjacent document. This means that it is possible for us to hit a prepare conflict if
+ // we query for an incomplete key and an adjacent key is prepared.
+ // We ignore prepare conflicts on recovering nodes because they may encounter prepare
+ // conflicts that did not occur on the primary.
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(
+ PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
+
// We might replay a prepared transaction behind oldest timestamp.
opCtx->recoveryUnit()->setRoundUpPreparedTimestamps(true);
- return _applyPrepareTransaction(opCtx, entry, mode);
+
+ // Checks out the session, applies the operations and prepares the transaction.
+ uassertStatusOK(_applyPrepareTransaction(opCtx, prepareEntry, mode));
}
} // namespace
/**
- * Make sure that if we are in replication recovery or initial sync, we don't apply the prepare
- * transaction oplog entry until we either see a commit transaction oplog entry or are at the very
- * end of recovery/initial sync. Otherwise, only apply the prepare transaction oplog entry if we are
- * a secondary.
+ * Make sure that if we are in replication recovery, we don't apply the prepare transaction oplog
+ * entry until we either see a commit transaction oplog entry or are at the very end of recovery.
+ * Otherwise, only apply the prepare transaction oplog entry if we are a secondary. We shouldn't get
+ * here for initial sync and applyOps should error.
*/
Status applyPrepareTransaction(OperationContext* opCtx,
const OplogEntry& entry,
- repl::OplogApplication::Mode oplogApplicationMode) {
- // Don't apply the operations from the prepared transaction until either we see a commit
- // transaction oplog entry during recovery or are at the end of recovery.
- if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) {
- if (!serverGlobalParams.enableMajorityReadConcern) {
- error() << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
+ repl::OplogApplication::Mode mode) {
+ switch (mode) {
+ case repl::OplogApplication::Mode::kRecovering: {
+ if (!serverGlobalParams.enableMajorityReadConcern) {
+ error()
+ << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
"set to false. Restart the server with --enableMajorityReadConcern=true "
"to complete recovery.";
- }
- fassert(51146, serverGlobalParams.enableMajorityReadConcern);
- return Status::OK();
- }
+ fassertFailed(51146);
+ }
- // Don't apply the operations from the prepared transaction until either we see a commit
- // transaction oplog entry during the oplog application phase of initial sync or are at the end
- // of initial sync.
- if (oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
- return Status::OK();
+ // Don't apply the operations from the prepared transaction until either we see a commit
+ // transaction oplog entry during recovery or are at the end of recovery.
+ return Status::OK();
+ }
+ case repl::OplogApplication::Mode::kInitialSync: {
+ // Initial sync should never apply 'prepareTransaction' since it unpacks committed
+ // transactions onto various applier threads at commit time.
+ MONGO_UNREACHABLE;
+ }
+ case repl::OplogApplication::Mode::kApplyOpsCmd: {
+ // Return error if run via applyOps command.
+ uasserted(51145,
+ "prepare applyOps oplog entry is only used internally by secondaries.");
+ }
+ case repl::OplogApplication::Mode::kSecondary: {
+ return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kSecondary);
+ }
}
-
- // Return error if run via applyOps command.
- uassert(51145,
- "prepare applyOps oplog entry is only used internally by secondaries.",
- oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd);
-
- invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary);
- return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
+ MONGO_UNREACHABLE;
}
void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) {
@@ -454,22 +476,7 @@ void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplica
opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions");
AlternativeClientRegion acr(newClient);
const auto newOpCtx = cc().makeOperationContext();
- repl::UnreplicatedWritesBlock uwb(newOpCtx.get());
-
- // Snapshot transaction can never conflict with the PBWM lock.
- newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
-
- // When querying indexes, we return the record matching the key if it exists, or an
- // adjacent document. This means that it is possible for us to hit a prepare conflict if
- // we query for an incomplete key and an adjacent key is prepared.
- // We ignore prepare conflicts on recovering nodes because they may encounter prepare
- // conflicts that did not occur on the primary.
- newOpCtx->recoveryUnit()->setPrepareConflictBehavior(
- PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
-
- // Checks out the session, applies the operations and prepares the transaction.
- uassertStatusOK(
- applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode));
+ _reconstructPreparedTransaction(newOpCtx.get(), prepareOplogEntry, mode);
}
}
}
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index 5ad20f48875..ad85ccca6fa 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -63,8 +63,7 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const repl::OplogEntry& entry,
- const std::vector<repl::OplogEntry*>& cachedOps,
- boost::optional<Timestamp> commitOplogEntryTS);
+ const std::vector<repl::OplogEntry*>& cachedOps) noexcept;
/**
* Apply `prepareTransaction` oplog entry.