summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp81
1 files changed, 74 insertions, 7 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index bc2bfb4c37d..069e56afba4 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
@@ -68,6 +69,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/stats/timer_stats.h"
@@ -802,6 +804,21 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
}
}
+// Returns whether an oplog entry represents a commitTransaction for a transaction which has not
+// been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to
+// false.
+inline bool isUnpreparedCommit(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction &&
+ entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].isBoolean() &&
+ !entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].boolean();
+}
+
+// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation,
+// as opposed to part of a prepared transaction.
+inline bool isUnpreparedApplyOps(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare();
+}
+
// Copies ops out of the bgsync queue into the deque passed in as a parameter.
// Returns true if the batch should be ended early.
// Batch should end early if we encounter a command, or if
@@ -870,9 +887,10 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
return true;
}
- // Commands must be processed one at a time. The only exception to this is applyOps because
- // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe
- // to batch applyOps commands with CRUD operations when reading from the oplog buffer.
+ // Commands must be processed one at a time. The exceptions to this are unprepared applyOps,
+ // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared
+ // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to
+ // batch applyOps commands with CRUD operations when reading from the oplog buffer.
//
// Oplog entries on 'system.views' should also be processed one at a time. View catalog
// immediately reflects changes for each oplog entry so we can see inconsistent view catalog if
@@ -880,8 +898,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
//
// Process updates to 'admin.system.version' individually as well so the secondary's FCV when
// processing each operation matches the primary's when committing that operation.
- if ((entry.isCommand() &&
- (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) ||
+ if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) ||
entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
@@ -1170,7 +1187,8 @@ Status multiSyncApply(OperationContext* opCtx,
* vector in any other way.
* writerVectors - Set of operations for each worker thread to apply.
* derivedOps - If provided, this function inserts a decomposition of applyOps operations
- * and instructions for updating the transactions table.
+ * and instructions for updating the transactions table. Required if processing oplogs
+ * with transactions.
* sessionUpdateTracker - if provided, keeps track of session info from ops.
*/
void SyncTail::_fillWriterVectors(OperationContext* opCtx,
@@ -1185,6 +1203,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
const uint32_t numWriters = writerVectors->size();
CachedCollectionProperties collPropertiesCache;
+ LogicalSessionIdMap<std::vector<OplogEntry*>> pendingTxnOps;
for (auto&& op : *ops) {
// If the operation's optime is before or the same as the beginApplyingOpTime we don't want
@@ -1208,6 +1227,20 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
}
}
+ // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
+ // We must save it here because we are not guaranteed it has been written to the oplog
+ // yet.
+ if (op.isInPendingTransaction()) {
+ auto& pendingList = pendingTxnOps[*op.getSessionId()];
+ if (!pendingList.empty() && pendingList.front()->getTxnNumber() != op.getTxnNumber()) {
+ // TODO: When abortTransaction is implemented, this should invariant and
+ // the list should be cleared on abort.
+ pendingList.clear();
+ }
+ pendingList.push_back(&op);
+ continue;
+ }
+
if (op.isCrudOpType()) {
auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
@@ -1233,7 +1266,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Extract applyOps operations and fill writers with extracted operations using this
// function.
- if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) {
+ if (isUnpreparedApplyOps(op)) {
try {
derivedOps->emplace_back(ApplyOps::extractOperations(op));
@@ -1247,6 +1280,40 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
<< redact(op.toBSON())));
}
continue;
+ } else if (isUnpreparedCommit(op)) {
+ // On commit of unprepared transactions, get transactional operations from the oplog and
+ // fill writers with those operations.
+ try {
+ invariant(derivedOps);
+ auto& pendingList = pendingTxnOps[*op.getSessionId()];
+ {
+ // We need to create an alternate opCtx to avoid the reads of the transaction
+ // messing up the state of the main opCtx. In particular we do not want to
+ // set the ReadSource to kLastApplied for the main opCtx.
+ // TODO(SERVER-40053): This should be no longer necessary after
+ // SERVER-40053 makes the transaction history iterator
+ // avoid changing the read source.
+ auto newClient =
+ opCtx->getServiceContext()->makeClient("read-pending-transactions");
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtx = cc().makeOperationContext();
+ ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
+ newOpCtx->lockState());
+ derivedOps->emplace_back(
+ readTransactionOperationsFromOplogChain(newOpCtx.get(), op, pendingList));
+ pendingList.clear();
+ }
+ // Transaction entries cannot have different session updates.
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ } catch (...) {
+ fassertFailedWithStatusNoTrace(
+ 51116,
+ exceptionToStatus().withContext(str::stream()
+ << "Unable to read operations for transaction "
+ << "commit "
+ << redact(op.toBSON())));
+ }
+ continue;
}
auto& writer = (*writerVectors)[hash % numWriters];