summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp167
-rw-r--r--src/mongo/db/repl/session_update_tracker.h26
-rw-r--r--src/mongo/db/repl/sync_tail.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp200
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp29
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h10
6 files changed, 367 insertions, 69 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 28a78ae7449..66132aebdde 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_participant_gen.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -46,6 +47,33 @@ namespace repl {
namespace {
/**
+ * Creates an oplog entry to perform an update on the transaction table.
+ */
+OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime,
+ const BSONObj& updateBSON,
+ const BSONObj& o2Field,
+ Date_t wallClockTime) {
+ return repl::OplogEntry(opTime,
+ boost::none, // hash
+ repl::OpTypeEnum::kUpdate,
+ NamespaceString::kSessionTransactionsTableNamespace,
+ boost::none, // uuid
+ false, // fromMigrate
+ repl::OplogEntry::kOplogVersion,
+ updateBSON,
+ o2Field,
+ {}, // sessionInfo
+ true, // upsert
+ wallClockTime,
+ boost::none, // statementId
+ boost::none, // prevWriteOpTime
+ boost::none, // preImangeOpTime
+ boost::none, // postImageOpTime
+ boost::none // prepare
+ );
+}
+
+/**
* Constructs a new oplog entry if the given entry has transaction state embedded within in. The new
* oplog entry will contain the operation needed to replicate the transaction table.
*
@@ -69,56 +97,35 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
newTxnRecord.setLastWriteOpTime(entry.getOpTime());
newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
- // "state" is a new field in 4.2.
- if (serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
- switch (entry.getCommandType()) {
- case repl::OplogEntry::CommandType::kApplyOps:
- if (entry.shouldPrepare()) {
- newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
- newTxnRecord.setStartOpTime(entry.getOpTime());
- } else {
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- }
- break;
- case repl::OplogEntry::CommandType::kCommitTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kAbortTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kAborted);
- break;
- default:
- break;
- }
- }
-
return newTxnRecord.toBSON();
}();
- return repl::OplogEntry(
+ return createOplogEntryForTransactionTableUpdate(
entry.getOpTime(),
- boost::none, // hash
- repl::OpTypeEnum::kUpdate,
- NamespaceString::kSessionTransactionsTableNamespace,
- boost::none, // uuid
- false, // fromMigrate
- repl::OplogEntry::kOplogVersion,
updateBSON,
BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
- {}, // sessionInfo
- true, // upsert
- *entry.getWallClockTime(),
- boost::none, // statementId
- boost::none, // prevWriteOpTime
- boost::none, // preImangeOpTime
- boost::none, // postImageOpTime
- boost::none // prepare
- );
+ *entry.getWallClockTime());
+}
+
+/**
+ * Returns true if the oplog entry represents an operation in a transaction and false otherwise.
+ */
+bool isTransactionEntry(OplogEntry entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+ if (!sessionInfo.getTxnNumber()) {
+ return false;
+ }
+
+ return entry.isInPendingTransaction() ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kPrepareTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction ||
+ entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps;
}
} // namespace
-boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
+boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush(
const OplogEntry& entry) {
const auto& ns = entry.getNss();
@@ -131,6 +138,16 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
return boost::none;
}
+boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession(
+ const OplogEntry& entry) {
+ if (!isTransactionEntry(entry)) {
+ return _updateOrFlush(entry);
+ }
+ auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry);
+ return (txnTableUpdate) ? boost::optional<std::vector<OplogEntry>>({*txnTableUpdate})
+ : boost::none;
+}
+
void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
const auto& sessionInfo = entry.getOperationSessionInfo();
@@ -204,7 +221,6 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() {
invariant(newUpdate);
opList.push_back(std::move(*newUpdate));
}
-
_sessionsToUpdate.clear();
return opList;
@@ -229,5 +245,74 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate(
return opList;
}
+boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp(
+ const repl::OplogEntry& entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+
+ // We only update the transaction table on the first inTxn operation.
+ if (entry.isInPendingTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ return boost::none;
+ }
+ invariant(sessionInfo.getSessionId());
+ invariant(entry.getWallClockTime());
+
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ newTxnRecord.setLastWriteOpTime(entry.getOpTime());
+ newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
+
+ // "state" is a new field in 4.2.
+ if (serverGlobalParams.featureCompatibility.getVersion() <
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
+ return newTxnRecord.toBSON();
+ }
+
+ if (entry.isInPendingTransaction()) {
+ invariant(entry.getPrevWriteOpTimeInTransaction()->isNull());
+ newTxnRecord.setState(DurableTxnStateEnum::kInProgress);
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ return newTxnRecord.toBSON();
+ }
+ switch (entry.getCommandType()) {
+ case repl::OplogEntry::CommandType::kApplyOps:
+ if (entry.shouldPrepare()) {
+ newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ } else {
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ }
+ break;
+ case repl::OplogEntry::CommandType::kPrepareTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kPrepared);
+ if (entry.getPrevWriteOpTimeInTransaction()->isNull()) {
+ // The 'prepareTransaction' entry is the first operation of the transaction.
+ newTxnRecord.setStartOpTime(entry.getOpTime());
+ } else {
+ // Update the transaction record using $set to avoid overwriting the
+ // startOpTime.
+ return BSON("$set" << newTxnRecord.toBSON());
+ }
+ break;
+ case repl::OplogEntry::CommandType::kCommitTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kAbortTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kAborted);
+ break;
+ default:
+ break;
+ }
+ return newTxnRecord.toBSON();
+ }();
+
+ return createOplogEntryForTransactionTableUpdate(
+ entry.getOpTime(),
+ updateBSON,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
+ *entry.getWallClockTime());
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h
index b729a78869f..4c646644cb0 100644
--- a/src/mongo/db/repl/session_update_tracker.h
+++ b/src/mongo/db/repl/session_update_tracker.h
@@ -51,17 +51,18 @@ namespace repl {
class SessionUpdateTracker {
public:
/**
- * Inspects the oplog entry and determines whether this needs to update the session info or
- * flush stored transaction information to oplog writes.
- */
- boost::optional<std::vector<OplogEntry>> updateOrFlush(const OplogEntry& entry);
-
- /**
* Converts all stored transaction infos to oplog writes to config.transactions.
* Can return an empty vector if there is nothing to flush.
*/
std::vector<OplogEntry> flushAll();
+ /**
+ * If 'entry' is part of a transaction, then return an update to the transaction table
+ * corresponding to the operation. Otherwise, inspect the entry to determine whether to buffer
+ * or flush the stored transaction information as part of retryable writes.
+ */
+ boost::optional<std::vector<OplogEntry>> updateSession(const OplogEntry& entry);
+
private:
/**
* Analyzes the given oplog entry and determines which transactions stored so far needs to be
@@ -82,6 +83,19 @@ private:
*/
void _updateSessionInfo(const OplogEntry& entry);
+ /**
+ * Inspects the oplog entry and determines whether this needs to update the session info or
+ * flush stored transaction information to oplog writes.
+ */
+ boost::optional<std::vector<OplogEntry>> _updateOrFlush(const OplogEntry& entry);
+
+ /**
+ * Returns an update to the transaction table generated by a transaction operation. This returns
+ * boost::none if the operation is an inTxn operation that is not the first of the transaction.
+ */
+ boost::optional<OplogEntry> _createTransactionTableUpdateFromTransactionOp(
+ const repl::OplogEntry& entry);
+
LogicalSessionIdMap<OplogEntry> _sessionsToUpdate;
};
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 68de8ea4e39..37d270817ab 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/transaction_participant.h"
+#include "mongo/db/transaction_participant_gen.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
@@ -1100,7 +1101,6 @@ Status multiSyncApply(OperationContext* opCtx,
return Status::OK();
}
-
/**
* ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops
* vector in any other way.
@@ -1140,7 +1140,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// We need to track all types of ops, including type 'n' (these are generated from chunk
// migrations).
if (sessionUpdateTracker) {
- if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) {
+ if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
_fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index aae7b2932e1..ec850c4a921 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -484,6 +484,21 @@ protected:
gUseMultipleOplogEntryFormatForTransactions = false;
SyncTailTest::tearDown();
}
+
+ void checkTxnTable(const LogicalSessionId& lsid,
+ const TxnNumber& txnNum,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock,
+ boost::optional<repl::OpTime> expectedStartOpTime,
+ DurableTxnStateEnum expectedState) {
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ expectedOpTime,
+ expectedWallClock,
+ expectedStartOpTime,
+ expectedState);
+ }
};
TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
@@ -552,13 +567,20 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
// Apply a batch with only the first operation. This should result in the first oplog entry
- // being put in the oplog, but with no effect because the operation is part of a pending
- // transaction.
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = insertOp1.getOpTime();
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1}));
ASSERT_EQ(1U, insertedOplogEntries.size());
ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp1.toBSON());
ASSERT_TRUE(insertedDocs1.empty());
ASSERT_TRUE(insertedDocs2.empty());
+ checkTxnTable(lsid,
+ txnNum,
+ insertOp1.getOpTime(),
+ *insertOp1.getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Apply a batch with only the second operation. This should result in the second oplog entry
// being put in the oplog, but with no effect because the operation is part of a pending
@@ -568,6 +590,14 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp2.toBSON());
ASSERT_TRUE(insertedDocs1.empty());
ASSERT_TRUE(insertedDocs2.empty());
+ // The transaction table should not have been updated for inTxn operations that are not the
+ // first in a transaction.
+ checkTxnTable(lsid,
+ txnNum,
+ insertOp1.getOpTime(),
+ *insertOp1.getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being applied.
@@ -576,6 +606,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
ASSERT_EQ(1U, insertedDocs1.size());
ASSERT_EQ(1U, insertedDocs2.size());
ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), commitOp.toBSON());
+ checkTxnTable(lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
}
TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
@@ -657,6 +693,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
ASSERT_EQ(0U, insertedOplogEntries.size());
ASSERT_EQ(1U, insertedDocs1.size());
ASSERT_EQ(1U, insertedDocs2.size());
+ checkTxnTable(lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
}
TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
@@ -721,10 +763,17 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the first entry in its own batch. This should result in the oplog entry being written
// but the entry should not be applied as it is part of a pending transaction.
+ const auto expectedStartOpTime = insertOps[0].getOpTime();
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
ASSERT_EQ(1U, insertedOplogEntries.size());
ASSERT_EQ(0U, insertedDocs1.size());
ASSERT_EQ(0U, insertedDocs2.size());
+ checkTxnTable(lsid,
+ txnNum,
+ insertOps[0].getOpTime(),
+ *insertOps[0].getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Insert the rest of the entries, including the commit. These entries should be added to the
// oplog, and all the entries including the first should be applied.
@@ -733,6 +782,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
ASSERT_EQ(5U, insertedOplogEntries.size());
ASSERT_EQ(3U, insertedDocs1.size());
ASSERT_EQ(1U, insertedDocs2.size());
+ checkTxnTable(lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
// Check docs and ordering of docs in nss1.
// The insert into nss2 is unordered with respect to those.
@@ -850,6 +905,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
ASSERT_EQ(6U, insertedOplogEntries.size());
ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
ASSERT_EQ(4U, insertedDocs1.size());
+ checkTxnTable(lsid,
+ txnNum2,
+ commitOp2.getOpTime(),
+ *commitOp2.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
// Check docs and ordering of docs in nss1.
ASSERT_BSONOBJ_EQ(insertOps1[0].getObject(), insertedDocs1[0]);
@@ -916,7 +977,14 @@ protected:
_prepareOp->getOpTime());
// This re-parse puts the commit op into a normalized form for comparison.
_commitOp = uassertStatusOK(OplogEntry::parse(_commitOp->toBSON()));
-
+ _abortOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("abortTransaction" << 1),
+ _lsid,
+ _txnNum,
+ StmtId(3),
+ _prepareOp->getOpTime());
+ _abortOp = uassertStatusOK(OplogEntry::parse(_abortOp->toBSON()));
_opObserver->onInsertsFn =
[&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
stdx::lock_guard<stdx::mutex> lock(_insertMutex);
@@ -940,7 +1008,7 @@ protected:
LogicalSessionId _lsid;
TxnNumber _txnNum;
boost::optional<OplogEntry> _insertOp1, _insertOp2;
- boost::optional<OplogEntry> _prepareOp, _commitOp;
+ boost::optional<OplogEntry> _prepareOp, _commitOp, _abortOp;
std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs;
std::unique_ptr<ThreadPool> _writerPool;
@@ -953,14 +1021,21 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
// Apply a batch with the insert operations. This should result in the oplog entries
- // being put in the oplog, but with no effect because the operation is part of a pending
- // transaction.
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->toBSON(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->toBSON(), oplogDocs()[1]);
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ *_insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction).
@@ -969,6 +1044,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
ASSERT_BSONOBJ_EQ(_prepareOp->toBSON(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareOp->getOpTime(),
+ *_prepareOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being committed.
@@ -976,6 +1057,52 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
ASSERT_BSONOBJ_EQ(_commitOp->toBSON(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitOp->getOpTime(),
+ *_commitOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) {
+ SyncTail syncTail(
+ nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+
+ // Apply a batch with the insert operations. This should result in the oplog entries
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ *_insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
+
+ // Apply a batch with only the prepare. This should result in the prepare being put in the
+ // oplog, and the two previous entries being applied (but in a transaction).
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareOp}));
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareOp->getOpTime(),
+ *_prepareOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
+
+ // Apply a batch with only the abort. This should result in the abort being put in the
+ // oplog and the transaction table being updated accordingly.
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortOp}));
+ ASSERT_BSONOBJ_EQ(_abortOp->toBSON(), oplogDocs().back());
+ ASSERT_EQ(1U, _insertedDocs[_nss1].size());
+ ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _abortOp->getOpTime(),
+ *_abortOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kAborted);
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) {
@@ -987,14 +1114,21 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
SyncTailTest::makeInitialSyncOptions());
// Apply a batch with the insert operations. This should result in the oplog entries
- // being put in the oplog, but with no effect because the operation is part of a pending
- // transaction.
+ // being put in the oplog and updating the transaction table, but not actually being applied
+ // because they are part of a pending transaction.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->toBSON(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->toBSON(), oplogDocs()[1]);
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ *_insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, but, since this is initial sync, nothing else.
@@ -1003,6 +1137,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
ASSERT_BSONOBJ_EQ(_prepareOp->toBSON(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareOp->getOpTime(),
+ *_prepareOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being applied.
@@ -1010,6 +1150,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
ASSERT_BSONOBJ_EQ(_commitOp->toBSON(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitOp->getOpTime(),
+ *_commitOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) {
@@ -1033,16 +1179,29 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with the insert operations. This should have no effect, because this is
// recovery.
+ const auto expectedStartOpTime = _insertOp1->getOpTime();
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _insertOp1->getOpTime(),
+ *_insertOp1->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kInProgress);
// Apply a batch with only the prepare. This should have no effect, since this is recovery.
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareOp}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _prepareOp->getOpTime(),
+ *_prepareOp->getWallClockTime(),
+ expectedStartOpTime,
+ DurableTxnStateEnum::kPrepared);
// Apply a batch with only the commit. This should result in the the two previous entries being
// applied.
@@ -1050,6 +1209,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
+ checkTxnTable(_lsid,
+ _txnNum,
+ _commitOp->getOpTime(),
+ *_commitOp->getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
}
void testWorkerMultikeyPaths(OperationContext* opCtx,
@@ -2232,18 +2397,13 @@ public:
invariant(sessionInfo.getSessionId());
invariant(sessionInfo.getTxnNumber());
- DBDirectClient client(_opCtx.get());
- auto result = client.findOne(
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())});
- ASSERT_FALSE(result.isEmpty());
-
- auto txnRecord =
- SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result);
-
- ASSERT_EQ(*sessionInfo.getTxnNumber(), txnRecord.getTxnNum());
- ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime());
- ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate());
+ repl::checkTxnTable(_opCtx.get(),
+ *sessionInfo.getSessionId(),
+ *sessionInfo.getTxnNumber(),
+ expectedOpTime,
+ expectedWallClock,
+ {},
+ {});
}
static const NamespaceString& nss() {
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 7811b128fc8..9e9b5ed6fab 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
@@ -248,6 +249,34 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
return Status::OK();
}
+void checkTxnTable(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNum,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock,
+ boost::optional<repl::OpTime> expectedStartOpTime,
+ boost::optional<DurableTxnStateEnum> expectedState) {
+ DBDirectClient client(opCtx);
+ auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
+ ASSERT_FALSE(result.isEmpty());
+
+ auto txnRecord =
+ SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result);
+
+ ASSERT_EQ(txnNum, txnRecord.getTxnNum());
+ ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime());
+ ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate());
+ if (expectedStartOpTime) {
+ ASSERT(txnRecord.getStartOpTime());
+ ASSERT_EQ(*expectedStartOpTime, *txnRecord.getStartOpTime());
+ } else {
+ ASSERT(!txnRecord.getStartOpTime());
+ }
+ if (expectedState) {
+ ASSERT(*expectedState == txnRecord.getState());
+ }
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index d34191857be..a47c1071891 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -31,10 +31,12 @@
#include "mongo/base/status.h"
#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/op_observer_noop.h"
#include "mongo/db/repl/replication_consistency_markers.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_txn_record_gen.h"
namespace mongo {
@@ -151,5 +153,13 @@ Status failedApplyCommand(OperationContext* opCtx,
const BSONObj& theOperation,
OplogApplication::Mode);
+void checkTxnTable(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNum,
+ const repl::OpTime& expectedOpTime,
+ Date_t expectedWallClock,
+ boost::optional<repl::OpTime> expectedStartOpTime,
+ boost::optional<DurableTxnStateEnum> expectedState);
+
} // namespace repl
} // namespace mongo