diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 81 |
1 files changed, 74 insertions, 7 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index bc2bfb4c37d..069e56afba4 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -49,6 +49,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/replication_state_transition_lock_guard.h" @@ -68,6 +69,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" @@ -802,6 +804,21 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, } } +// Returns whether an oplog entry represents a commitTransaction for a transaction which has not +// been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to +// false. +inline bool isUnpreparedCommit(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction && + entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].isBoolean() && + !entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName].boolean(); +} + +// Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation, +// as opposed to part of a prepared transaction. +inline bool isUnpreparedApplyOps(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare(); +} + // Copies ops out of the bgsync queue into the deque passed in as a parameter. // Returns true if the batch should be ended early. // Batch should end early if we encounter a command, or if @@ -870,9 +887,10 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, return true; } - // Commands must be processed one at a time. The only exception to this is applyOps because - // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe - // to batch applyOps commands with CRUD operations when reading from the oplog buffer. + // Commands must be processed one at a time. The exceptions to this are unprepared applyOps, + // because applyOps oplog entries are effectively containers for CRUD operations, and unprepared + // commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to + // batch applyOps commands with CRUD operations when reading from the oplog buffer. // // Oplog entries on 'system.views' should also be processed one at a time. View catalog // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if @@ -880,8 +898,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, // // Process updates to 'admin.system.version' individually as well so the secondary's FCV when // processing each operation matches the primary's when committing that operation. - if ((entry.isCommand() && - (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) || + if ((entry.isCommand() && (!isUnpreparedCommit(entry) && !isUnpreparedApplyOps(entry))) || entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) { if (ops->getCount() == 1) { // apply commands one-at-a-time @@ -1170,7 +1187,8 @@ Status multiSyncApply(OperationContext* opCtx, * vector in any other way. * writerVectors - Set of operations for each worker thread to apply. * derivedOps - If provided, this function inserts a decomposition of applyOps operations - * and instructions for updating the transactions table. + * and instructions for updating the transactions table. Required if processing oplogs + * with transactions. * sessionUpdateTracker - if provided, keeps track of session info from ops. */ void SyncTail::_fillWriterVectors(OperationContext* opCtx, @@ -1185,6 +1203,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, const uint32_t numWriters = writerVectors->size(); CachedCollectionProperties collPropertiesCache; + LogicalSessionIdMap<std::vector<OplogEntry*>> pendingTxnOps; for (auto&& op : *ops) { // If the operation's optime is before or the same as the beginApplyingOpTime we don't want @@ -1208,6 +1227,20 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, } } + // If this entry is part of a multi-oplog-entry transaction, ignore it until the commit. + // We must save it here because we are not guaranteed it has been written to the oplog + // yet. + if (op.isInPendingTransaction()) { + auto& pendingList = pendingTxnOps[*op.getSessionId()]; + if (!pendingList.empty() && pendingList.front()->getTxnNumber() != op.getTxnNumber()) { + // TODO: When abortTransaction is implemented, this should invariant and + // the list should be cleared on abort. + pendingList.clear(); + } + pendingList.push_back(&op); + continue; + } + if (op.isCrudOpType()) { auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs); @@ -1233,7 +1266,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Extract applyOps operations and fill writers with extracted operations using this // function. - if (op.getCommandType() == OplogEntry::CommandType::kApplyOps && !op.shouldPrepare()) { + if (isUnpreparedApplyOps(op)) { try { derivedOps->emplace_back(ApplyOps::extractOperations(op)); @@ -1247,6 +1280,40 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, << redact(op.toBSON()))); } continue; + } else if (isUnpreparedCommit(op)) { + // On commit of unprepared transactions, get transactional operations from the oplog and + // fill writers with those operations. + try { + invariant(derivedOps); + auto& pendingList = pendingTxnOps[*op.getSessionId()]; + { + // We need to create an alternate opCtx to avoid the reads of the transaction + // messing up the state of the main opCtx. In particular we do not want to + // set the ReadSource to kLastApplied for the main opCtx. + // TODO(SERVER-40053): This should be no longer necessary after + // SERVER-40053 makes the transaction history iterator + // avoid changing the read source. + auto newClient = + opCtx->getServiceContext()->makeClient("read-pending-transactions"); + AlternativeClientRegion acr(newClient); + auto newOpCtx = cc().makeOperationContext(); + ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( + newOpCtx->lockState()); + derivedOps->emplace_back( + readTransactionOperationsFromOplogChain(newOpCtx.get(), op, pendingList)); + pendingList.clear(); + } + // Transaction entries cannot have different session updates. + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + } catch (...) { + fassertFailedWithStatusNoTrace( + 51116, + exceptionToStatus().withContext(str::stream() + << "Unable to read operations for transaction " + << "commit " + << redact(op.toBSON()))); + } + continue; } auto& writer = (*writerVectors)[hash % numWriters]; |