summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/session_update_tracker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/session_update_tracker.cpp')
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp167
1 files changed, 126 insertions, 41 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 28a78ae7449..66132aebdde 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_participant_gen.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -46,6 +47,33 @@ namespace repl {
namespace {
/**
+ * Creates an oplog entry to perform an update on the transaction table.
+ */
+OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime,
+ const BSONObj& updateBSON,
+ const BSONObj& o2Field,
+ Date_t wallClockTime) {
+ return repl::OplogEntry(opTime,
+ boost::none, // hash
+ repl::OpTypeEnum::kUpdate,
+ NamespaceString::kSessionTransactionsTableNamespace,
+ boost::none, // uuid
+ false, // fromMigrate
+ repl::OplogEntry::kOplogVersion,
+ updateBSON,
+ o2Field,
+ {}, // sessionInfo
+ true, // upsert
+ wallClockTime,
+ boost::none, // statementId
+ boost::none, // prevWriteOpTime
+ boost::none, // preImangeOpTime
+ boost::none, // postImageOpTime
+ boost::none // prepare
+ );
+}
+
+/**
* Constructs a new oplog entry if the given entry has transaction state embedded within in. The new
* oplog entry will contain the operation needed to replicate the transaction table.
*
@@ -69,56 +97,35 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
newTxnRecord.setLastWriteOpTime(entry.getOpTime());
newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
- // "state" is a new field in 4.2.
- if (serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
- switch (entry.getCommandType()) {
- case repl::OplogEntry::CommandType::kApplyOps:
- if (entry.shouldPrepare()) {
- newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
- newTxnRecord.setStartOpTime(entry.getOpTime());
- } else {
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- }
- break;
- case repl::OplogEntry::CommandType::kCommitTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kAbortTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kAborted);
- break;
- default:
- break;
- }
- }
-
return newTxnRecord.toBSON();
}();
- return repl::OplogEntry(
+ return createOplogEntryForTransactionTableUpdate(
entry.getOpTime(),
- boost::none, // hash
- repl::OpTypeEnum::kUpdate,
- NamespaceString::kSessionTransactionsTableNamespace,
- boost::none, // uuid
- false, // fromMigrate
- repl::OplogEntry::kOplogVersion,
updateBSON,
BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
- {}, // sessionInfo
- true, // upsert
- *entry.getWallClockTime(),
- boost::none, // statementId
- boost::none, // prevWriteOpTime
- boost::none, // preImangeOpTime
- boost::none, // postImageOpTime
- boost::none // prepare
- );
+ *entry.getWallClockTime());
+}
+
+/**
+ * Returns true if the oplog entry represents an operation in a transaction and false otherwise.
+ */
+bool isTransactionEntry(OplogEntry entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+ if (!sessionInfo.getTxnNumber()) {
+ return false;
+ }
+
+ return entry.isInPendingTransaction() ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kPrepareTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps;
}
} // namespace
-boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
+boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush(
const OplogEntry& entry) {
const auto& ns = entry.getNss();
@@ -131,6 +138,16 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
return boost::none;
}
+boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession(
+ const OplogEntry& entry) {
+ if (!isTransactionEntry(entry)) {
+ return _updateOrFlush(entry);
+ }
+ auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry);
+ return (txnTableUpdate) ? boost::optional<std::vector<OplogEntry>>({*txnTableUpdate})
+ : boost::none;
+}
+
void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
const auto& sessionInfo = entry.getOperationSessionInfo();
@@ -204,7 +221,6 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() {
invariant(newUpdate);
opList.push_back(std::move(*newUpdate));
}
-
_sessionsToUpdate.clear();
return opList;
@@ -229,5 +245,74 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate(
return opList;
}
+boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp(
+ const repl::OplogEntry& entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+
+ // We only update the transaction table on the first inTxn operation.
+ if (entry.isInPendingTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ return boost::none;
+ }
+ invariant(sessionInfo.getSessionId());
+ invariant(entry.getWallClockTime());
+
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ newTxnRecord.setLastWriteOpTime(entry.getOpTime());
+ newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
+
+ // "state" is a new field in 4.2.
+ if (serverGlobalParams.featureCompatibility.getVersion() <
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
+ return newTxnRecord.toBSON();
+ }
+
+ if (entry.isInPendingTransaction()) {
+ invariant(entry.getPrevWriteOpTimeInTransaction()->isNull());
+ newTxnRecord.setState(DurableTxnStateEnum::kInProgress);
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ return newTxnRecord.toBSON();
+ }
+ switch (entry.getCommandType()) {
+ case repl::OplogEntry::CommandType::kApplyOps:
+ if (entry.shouldPrepare()) {
+ newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ } else {
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ }
+ break;
+ case repl::OplogEntry::CommandType::kPrepareTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
+ if (entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ // The 'prepareTransaction' entry is the first operation of the transaction.
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ } else {
+ // Update the transaction record using $set to avoid overwriting the
+ // startOpTime.
+ return BSON("$set" << newTxnRecord.toBSON());
+ }
+ break;
+ case repl::OplogEntry::CommandType::kCommitTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kAbortTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kAborted);
+ break;
+ default:
+ break;
+ }
+ return newTxnRecord.toBSON();
+ }();
+
+ return createOplogEntryForTransactionTableUpdate(
+ entry.getOpTime(),
+ updateBSON,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
+ *entry.getWallClockTime());
+}
+
} // namespace repl
} // namespace mongo