summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2019-08-09 10:13:07 -0400
committerJudah Schvimer <judah@mongodb.com>2019-08-09 10:13:07 -0400
commitf14d9c4c07e69c2109de0af059123060c73cdd77 (patch)
tree846d6bef2b6f1cbbbac30a741af6484028371012
parent181ad8eeaaf0a0c636713699f8e110a3e94af125 (diff)
downloadmongo-f14d9c4c07e69c2109de0af059123060c73cdd77.tar.gz
SERVER-41437 unify transaction oplog traversal
-rw-r--r--src/mongo/db/repl/apply_ops.cpp13
-rw-r--r--src/mongo/db/repl/apply_ops.h10
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/oplog_entry.h17
-rw-r--r--src/mongo/db/repl/sync_tail.cpp101
-rw-r--r--src/mongo/db/repl/sync_tail.h4
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp85
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h3
8 files changed, 109 insertions, 137 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 76f87222aad..7d07bd007de 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -485,14 +485,13 @@ Status applyOps(OperationContext* opCtx,
// static
MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) {
MultiApplier::Operations result;
- extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none);
+ extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result);
return result;
}
void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations,
- boost::optional<Timestamp> commitOplogEntryTS) {
+ MultiApplier::Operations* operations) {
uassert(ErrorCodes::TypeMismatch,
str::stream() << "ApplyOps::extractOperations(): not a command: "
<< redact(applyOpsOplogEntry.toBSON()),
@@ -512,14 +511,8 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
for (const auto& elem : operationDocs) {
auto operationDoc = elem.Obj();
- BSONObjBuilder builder(operationDoc);
-
- // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique
- // will not overwrite this value.
- if (commitOplogEntryTS) {
- builder.append("ts", *commitOplogEntryTS);
- }
+ BSONObjBuilder builder(operationDoc);
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 8aac61a39b9..7489c94a486 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -55,16 +55,12 @@ public:
/**
* This variant allows optimization for extracting multiple applyOps operations. The entry for
* the non-DurableReplOperation fields of the extracted operation must be specified as
- * 'topLevelDoc', and need not be any of the applyOps operations.
- *
- * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations
- * from a prepare oplog entry during initial sync. These operations must be timestamped at the
- * commit oplog entry timestamp instead of the prepareTimestamp.
+ * 'topLevelDoc', and need not be any of the applyOps operations. The 'topLevelDoc' entry's
+ * 'ts' field will be used as the 'ts' field for each operation.
*/
static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations,
- boost::optional<Timestamp> commitOplogEntryTS);
+ MultiApplier::Operations* operations);
};
/**
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 8b99c794a90..ce5e8500f4f 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -389,8 +389,17 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
SyncTailTest::makeInitialSyncOptions());
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
- // Derive ops for transactions if necessary.
- syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+
+ // Keeps all operations in scope for the lifetime of this function.
+ std::vector<MultiApplier::Operations> singleOpVectors;
+ for (auto&& entry : ops) {
+ // Derive ops for transactions if necessary.
+ std::vector<OplogEntry> op;
+ op.push_back(entry);
+ singleOpVectors.emplace_back(op);
+ syncTail.fillWriterVectors(
+ _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps);
+ }
const auto& opPtrs = writerVectors[0];
ASSERT_OK(runOpPtrsInitialSync(opPtrs));
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index f1379307027..5ae170a8730 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -268,6 +268,23 @@ public:
}
/**
+ * Returns if this is a prepared 'commitTransaction' oplog entry.
+ */
+ bool isPreparedCommit() const {
+ return getCommandType() == OplogEntry::CommandType::kCommitTransaction;
+ }
+
+ /**
+ * Returns whether the oplog entry represents an applyOps which is a self-contained atomic
+ * operation, or the last applyOps of an unprepared transaction, as opposed to part of a
+ * prepared transaction or a non-final applyOps in a transaction.
+ */
+ bool isTerminalApplyOps() const {
+ return getCommandType() == OplogEntry::CommandType::kApplyOps && !shouldPrepare() &&
+ !isPartialTransaction() && !getObject().getBoolField("prepare");
+ }
+
+ /**
* Returns if the oplog entry is for a CRUD operation.
*/
static bool isCrudOpType(OpTypeEnum opType);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 8acfbc9d001..5b291d04739 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -827,20 +827,6 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
}
}
-// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation,
-// or the last applyOps of an unprepared transaction, as opposed to part of a prepared transaction
-// or a non-final applyOps in an transaction.
-inline bool isCommitApplyOps(const OplogEntry& entry) {
- return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare() &&
- !entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare");
-}
-
-// Returns whether a commitTransaction oplog entry is a part of a prepared transaction.
-inline bool isPreparedCommit(const OplogEntry& entry) {
- return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction;
-}
-
-
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
@@ -1105,7 +1091,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
+ SessionUpdateTracker* sessionUpdateTracker) noexcept {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1140,7 +1126,9 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
// We must save it here because we are not guaranteed it has been written to the oplog
// yet.
- if (op.isPartialTransaction()) {
+ // We also do this for prepare during initial sync.
+ if (op.isPartialTransaction() ||
+ (op.shouldPrepare() && _options.mode == OplogApplication::Mode::kInitialSync)) {
auto& partialTxnList = partialTxnOps[*op.getSessionId()];
// If this operation belongs to an existing partial transaction, partialTxnList
// must contain the previous operations of the transaction.
@@ -1180,43 +1168,29 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Extract applyOps operations and fill writers with extracted operations using this
// function.
- if (isCommitApplyOps(op)) {
- try {
- auto logicalSessionId = op.getSessionId();
- // applyOps entries generated by a transaction must have a sessionId and a
- // transaction number.
- if (logicalSessionId && op.getTxnNumber()) {
- // On commit of unprepared transactions, get transactional operations from the
- // oplog and fill writers with those operations.
- // Flush partialTxnList operations for current transaction.
- auto& partialTxnList = partialTxnOps[*logicalSessionId];
- {
- // We need to use a ReadSourceScope avoid the reads of the transaction
- // messing up the state of the opCtx. In particular we do not want to
- // set the ReadSource to kLastApplied.
- ReadSourceScope readSourceScope(opCtx);
- derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
- opCtx, op, partialTxnList, boost::none));
- partialTxnList.clear();
- }
- // Transaction entries cannot have different session updates.
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- } else {
- // The applyOps entry was not generated as part of a transaction.
- invariant(!op.getPrevWriteOpTimeInTransaction());
- derivedOps->emplace_back(ApplyOps::extractOperations(op));
-
- // Nested entries cannot have different session updates.
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
- } catch (...) {
- fassertFailedWithStatusNoTrace(
- 50711,
- exceptionToStatus().withContext(str::stream()
- << "Unable to extract operations from applyOps "
- << redact(op.toBSON())));
+ if (op.isTerminalApplyOps()) {
+ auto logicalSessionId = op.getSessionId();
+ // applyOps entries generated by a transaction must have a sessionId and a
+ // transaction number.
+ if (logicalSessionId && op.getTxnNumber()) {
+ // On commit of unprepared transactions, get transactional operations from the
+ // oplog and fill writers with those operations.
+ // Flush partialTxnList operations for current transaction.
+ auto& partialTxnList = partialTxnOps[*logicalSessionId];
+
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ partialTxnList.clear();
+
+ // Transaction entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ } else {
+ // The applyOps entry was not generated as part of a transaction.
+ invariant(!op.getPrevWriteOpTimeInTransaction());
+ derivedOps->emplace_back(ApplyOps::extractOperations(op));
+
+ // Nested entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
continue;
}
@@ -1224,24 +1198,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// If we see a commitTransaction command that is a part of a prepared transaction during
// initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
// with the extracted operations.
- if (isPreparedCommit(op) && (_options.mode == OplogApplication::Mode::kInitialSync)) {
+ if (op.isPreparedCommit() && (_options.mode == OplogApplication::Mode::kInitialSync)) {
auto logicalSessionId = op.getSessionId();
auto& partialTxnList = partialTxnOps[*logicalSessionId];
- {
- // Traverse the oplog chain with its own snapshot and read timestamp.
- ReadSourceScope readSourceScope(opCtx);
-
- // Get the previous oplog entry, which should be a prepare oplog entry.
- const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op);
- invariant(prevOplogEntry.shouldPrepare());
-
- // Extract the operations from the applyOps entry.
- auto commitOplogEntryOpTime = op.getOpTime();
- derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
- opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
- partialTxnList.clear();
- }
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ partialTxnList.clear();
_fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
continue;
@@ -1258,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+ std::vector<MultiApplier::Operations>* derivedOps) noexcept {
SessionUpdateTracker sessionUpdateTracker;
_fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 6a156a630aa..a2ab581d75c 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -227,7 +227,7 @@ public:
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps);
+ std::vector<MultiApplier::Operations>* derivedOps) noexcept;
private:
class OpQueueBatcher;
@@ -238,7 +238,7 @@ private:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker);
+ SessionUpdateTracker* sessionUpdateTracker) noexcept;
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index fe2ee9fbc18..d06b41c2643 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -93,15 +93,7 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
Timestamp durableTimestamp) {
invariant(mode == repl::OplogApplication::Mode::kRecovering);
- repl::MultiApplier::Operations ops;
- {
- // Traverse the oplog chain with its own snapshot and read timestamp.
- ReadSourceScope readSourceScope(opCtx);
-
- // Get the corresponding prepare applyOps oplog entry.
- const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry);
- ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none);
- }
+ auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {});
const auto dbName = entry.getNss().db().toString();
Status status = Status::OK();
@@ -236,30 +228,41 @@ Status applyAbortTransaction(OperationContext* opCtx,
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
- const OplogEntry& commitOrPrepare,
- const std::vector<OplogEntry*>& cachedOps,
- boost::optional<Timestamp> commitOplogEntryTS) {
- repl::MultiApplier::Operations ops;
+ const OplogEntry& lastEntryInTxn,
+ const std::vector<OplogEntry*>& cachedOps) noexcept {
+ // Traverse the oplog chain with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
- // Get the previous oplog entry.
- auto currentOpTime = commitOrPrepare.getOpTime();
+ repl::MultiApplier::Operations ops;
// The cachedOps are the ops for this transaction that are from the same oplog application batch
// as the commit or prepare, those which have not necessarily been written to the oplog. These
// ops are in order of increasing timestamp.
+ const auto oldestEntryInBatch = cachedOps.empty() ? lastEntryInTxn : *cachedOps.front();
- // The lastEntryOpTime is the OpTime of the last (latest OpTime) entry for this transaction
+ // The lastEntryWrittenToOplogOpTime is the OpTime of the latest entry for this transaction
// which is expected to be present in the oplog. It is the entry before the first cachedOp,
// unless there are no cachedOps in which case it is the entry before the commit or prepare.
- const auto lastEntryOpTime = (cachedOps.empty() ? commitOrPrepare : *cachedOps.front())
- .getPrevWriteOpTimeInTransaction();
- invariant(lastEntryOpTime < currentOpTime);
+ const auto lastEntryWrittenToOplogOpTime = oldestEntryInBatch.getPrevWriteOpTimeInTransaction();
+ invariant(lastEntryWrittenToOplogOpTime < lastEntryInTxn.getOpTime());
+
+ TransactionHistoryIterator iter(lastEntryWrittenToOplogOpTime.get());
+
+ // If we started with a prepared commit, we want to forget about that operation and move onto
+ // the prepare.
+ auto prepareOrUnpreparedCommit = lastEntryInTxn;
+ if (lastEntryInTxn.isPreparedCommit()) {
+ // A prepared-commit must be in its own batch and thus have no cached ops.
+ invariant(cachedOps.empty());
+ invariant(iter.hasNext());
+ prepareOrUnpreparedCommit = iter.nextFatalOnErrors(opCtx);
+ }
+ invariant(prepareOrUnpreparedCommit.getCommandType() == OplogEntry::CommandType::kApplyOps);
- TransactionHistoryIterator iter(lastEntryOpTime.get());
- // Empty commits are not allowed, but empty prepares are.
- invariant(commitOrPrepare.getCommandType() != OplogEntry::CommandType::kCommitTransaction ||
- !cachedOps.empty() || iter.hasNext());
- auto commitOrPrepareObj = commitOrPrepare.toBSON();
+ // The non-DurableReplOperation fields of the extracted transaction operations will match those
+ // of the lastEntryInTxn. For a prepared commit, this will include the commit oplog entry's
+ // 'ts' field, which is what we want.
+ auto lastEntryInTxnObj = lastEntryInTxn.toBSON();
// First retrieve and transform the ops from the oplog, which will be retrieved in reverse
// order.
@@ -267,8 +270,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
const auto& operationEntry = iter.nextFatalOnErrors(opCtx);
invariant(operationEntry.isPartialTransaction());
auto prevOpsEnd = ops.size();
- repl::ApplyOps::extractOperationsTo(
- operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
+ repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops);
+
// Because BSONArrays do not have fast way of determining size without iterating through
// them, and we also have no way of knowing how many oplog entries are in a transaction
// without iterating, reversing each applyOps and then reversing the whole array is
@@ -283,15 +286,11 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
for (auto* cachedOp : cachedOps) {
const auto& operationEntry = *cachedOp;
invariant(operationEntry.isPartialTransaction());
- repl::ApplyOps::extractOperationsTo(
- operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
+ repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops);
}
- // Reconstruct the operations from the commit or prepare oplog entry.
- if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) {
- repl::ApplyOps::extractOperationsTo(
- commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS);
- }
+ // Reconstruct the operations from the prepare or unprepared commit oplog entry.
+ repl::ApplyOps::extractOperationsTo(prepareOrUnpreparedCommit, lastEntryInTxnObj, &ops);
return ops;
}
@@ -307,10 +306,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
// The operations here are reconstructed at their prepare time. However, that time will
// be ignored because there is an outer write unit of work during their application.
// The prepare time of the transaction is set explicitly below.
- auto ops = [&] {
- ReadSourceScope readSourceScope(opCtx);
- return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none);
- }();
+ auto ops = readTransactionOperationsFromOplogChain(opCtx, entry, {});
if (mode == repl::OplogApplication::Mode::kRecovering ||
mode == repl::OplogApplication::Mode::kInitialSync) {
@@ -399,10 +395,10 @@ void _reconstructPreparedTransaction(OperationContext* opCtx,
} // namespace
/**
- * Make sure that if we are in replication recovery or initial sync, we don't apply the prepare
- * transaction oplog entry until we either see a commit transaction oplog entry or are at the very
- * end of recovery/initial sync. Otherwise, only apply the prepare transaction oplog entry if we are
- * a secondary.
+ * Make sure that if we are in replication recovery, we don't apply the prepare transaction oplog
+ * entry until we either see a commit transaction oplog entry or are at the very end of recovery.
+ * Otherwise, only apply the prepare transaction oplog entry if we are a secondary. We shouldn't get
+ * here for initial sync and applyOps should error.
*/
Status applyPrepareTransaction(OperationContext* opCtx,
const OplogEntry& entry,
@@ -422,10 +418,9 @@ Status applyPrepareTransaction(OperationContext* opCtx,
return Status::OK();
}
case repl::OplogApplication::Mode::kInitialSync: {
- // Don't apply the operations from the prepared transaction until either we see a commit
- // transaction oplog entry during the oplog application phase of initial sync or are at
- // the end of initial sync.
- return Status::OK();
+ // Initial sync should never apply 'prepareTransaction' since it unpacks committed
+ // transactions onto various applier threads at commit time.
+ MONGO_UNREACHABLE;
}
case repl::OplogApplication::Mode::kApplyOpsCmd: {
// Return error if run via applyOps command.
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index 5ad20f48875..ad85ccca6fa 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -63,8 +63,7 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const repl::OplogEntry& entry,
- const std::vector<repl::OplogEntry*>& cachedOps,
- boost::optional<Timestamp> commitOplogEntryTS);
+ const std::vector<repl::OplogEntry*>& cachedOps) noexcept;
/**
* Apply `prepareTransaction` oplog entry.