diff options
Diffstat (limited to 'src/mongo/db/repl/session_update_tracker.cpp')
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.cpp | 167 |
1 files changed, 126 insertions, 41 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 28a78ae7449..66132aebdde 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -38,6 +38,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -46,6 +47,33 @@ namespace repl { namespace { /** + * Creates an oplog entry to perform an update on the transaction table. + */ +OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime, + const BSONObj& updateBSON, + const BSONObj& o2Field, + Date_t wallClockTime) { + return repl::OplogEntry(opTime, + boost::none, // hash + repl::OpTypeEnum::kUpdate, + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, // uuid + false, // fromMigrate + repl::OplogEntry::kOplogVersion, + updateBSON, + o2Field, + {}, // sessionInfo + true, // upsert + wallClockTime, + boost::none, // statementId + boost::none, // prevWriteOpTime + boost::none, // preImangeOpTime + boost::none, // postImageOpTime + boost::none // prepare + ); +} + +/** * Constructs a new oplog entry if the given entry has transaction state embedded within in. The new * oplog entry will contain the operation needed to replicate the transaction table. * @@ -69,56 +97,35 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); - // "state" is a new field in 4.2. - if (serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { - switch (entry.getCommandType()) { - case repl::OplogEntry::CommandType::kApplyOps: - if (entry.shouldPrepare()) { - newTxnRecord.setState(DurableTxnStateEnum::kPrepared); - newTxnRecord.setStartOpTime(entry.getOpTime()); - } else { - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - } - break; - case repl::OplogEntry::CommandType::kCommitTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kAbortTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kAborted); - break; - default: - break; - } - } - return newTxnRecord.toBSON(); }(); - return repl::OplogEntry( + return createOplogEntryForTransactionTableUpdate( entry.getOpTime(), - boost::none, // hash - repl::OpTypeEnum::kUpdate, - NamespaceString::kSessionTransactionsTableNamespace, - boost::none, // uuid - false, // fromMigrate - repl::OplogEntry::kOplogVersion, updateBSON, BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), - {}, // sessionInfo - true, // upsert - *entry.getWallClockTime(), - boost::none, // statementId - boost::none, // prevWriteOpTime - boost::none, // preImangeOpTime - boost::none, // postImageOpTime - boost::none // prepare - ); + *entry.getWallClockTime()); +} + +/** + * Returns true if the oplog entry represents an operation in a transaction and false otherwise. + */ +bool isTransactionEntry(OplogEntry entry) { + auto sessionInfo = entry.getOperationSessionInfo(); + if (!sessionInfo.getTxnNumber()) { + return false; + } + + return entry.isInPendingTransaction() || + entry.getCommandType() == repl::OplogEntry::CommandType::kPrepareTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps; } } // namespace -boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( +boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush( const OplogEntry& entry) { const auto& ns = entry.getNss(); @@ -131,6 +138,16 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( return boost::none; } +boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession( + const OplogEntry& entry) { + if (!isTransactionEntry(entry)) { + return _updateOrFlush(entry); + } + auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry); + return (txnTableUpdate) ? boost::optional<std::vector<OplogEntry>>({*txnTableUpdate}) + : boost::none; +} + void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { const auto& sessionInfo = entry.getOperationSessionInfo(); @@ -204,7 +221,6 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() { invariant(newUpdate); opList.push_back(std::move(*newUpdate)); } - _sessionsToUpdate.clear(); return opList; @@ -229,5 +245,74 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate( return opList; } +boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp( + const repl::OplogEntry& entry) { + auto sessionInfo = entry.getOperationSessionInfo(); + + // We only update the transaction table on the first inTxn operation. + if (entry.isInPendingTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) { + return boost::none; + } + invariant(sessionInfo.getSessionId()); + invariant(entry.getWallClockTime()); + + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(*sessionInfo.getSessionId()); + newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + newTxnRecord.setLastWriteOpTime(entry.getOpTime()); + newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); + + // "state" is a new field in 4.2. + if (serverGlobalParams.featureCompatibility.getVersion() < + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { + return newTxnRecord.toBSON(); + } + + if (entry.isInPendingTransaction()) { + invariant(entry.getPrevWriteOpTimeInTransaction()->isNull()); + newTxnRecord.setState(DurableTxnStateEnum::kInProgress); + newTxnRecord.setStartOpTime(entry.getOpTime()); + return newTxnRecord.toBSON(); + } + switch (entry.getCommandType()) { + case repl::OplogEntry::CommandType::kApplyOps: + if (entry.shouldPrepare()) { + newTxnRecord.setState(DurableTxnStateEnum::kPrepared); + newTxnRecord.setStartOpTime(entry.getOpTime()); + } else { + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + } + break; + case repl::OplogEntry::CommandType::kPrepareTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kPrepared); + if (entry.getPrevWriteOpTimeInTransaction()->isNull()) { + // The 'prepareTransaction' entry is the first operation of the transaction. + newTxnRecord.setStartOpTime(entry.getOpTime()); + } else { + // Update the transaction record using $set to avoid overwriting the + // startOpTime. + return BSON("$set" << newTxnRecord.toBSON()); + } + break; + case repl::OplogEntry::CommandType::kCommitTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kAbortTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kAborted); + break; + default: + break; + } + return newTxnRecord.toBSON(); + }(); + + return createOplogEntryForTransactionTableUpdate( + entry.getOpTime(), + updateBSON, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), + *entry.getWallClockTime()); +} + } // namespace repl } // namespace mongo |