diff options
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.cpp | 167 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.h | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.h | 10 |
6 files changed, 367 insertions, 69 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 28a78ae7449..66132aebdde 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -38,6 +38,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -46,6 +47,33 @@ namespace repl { namespace { /** + * Creates an oplog entry to perform an update on the transaction table. + */ +OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime, + const BSONObj& updateBSON, + const BSONObj& o2Field, + Date_t wallClockTime) { + return repl::OplogEntry(opTime, + boost::none, // hash + repl::OpTypeEnum::kUpdate, + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, // uuid + false, // fromMigrate + repl::OplogEntry::kOplogVersion, + updateBSON, + o2Field, + {}, // sessionInfo + true, // upsert + wallClockTime, + boost::none, // statementId + boost::none, // prevWriteOpTime + boost::none, // preImangeOpTime + boost::none, // postImageOpTime + boost::none // prepare + ); +} + +/** * Constructs a new oplog entry if the given entry has transaction state embedded within in. The new * oplog entry will contain the operation needed to replicate the transaction table. * @@ -69,56 +97,35 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); - // "state" is a new field in 4.2. - if (serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { - switch (entry.getCommandType()) { - case repl::OplogEntry::CommandType::kApplyOps: - if (entry.shouldPrepare()) { - newTxnRecord.setState(DurableTxnStateEnum::kPrepared); - newTxnRecord.setStartOpTime(entry.getOpTime()); - } else { - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - } - break; - case repl::OplogEntry::CommandType::kCommitTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kAbortTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kAborted); - break; - default: - break; - } - } - return newTxnRecord.toBSON(); }(); - return repl::OplogEntry( + return createOplogEntryForTransactionTableUpdate( entry.getOpTime(), - boost::none, // hash - repl::OpTypeEnum::kUpdate, - NamespaceString::kSessionTransactionsTableNamespace, - boost::none, // uuid - false, // fromMigrate - repl::OplogEntry::kOplogVersion, updateBSON, BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), - {}, // sessionInfo - true, // upsert - *entry.getWallClockTime(), - boost::none, // statementId - boost::none, // prevWriteOpTime - boost::none, // preImangeOpTime - boost::none, // postImageOpTime - boost::none // prepare - ); + *entry.getWallClockTime()); +} + +/** + * Returns true if the oplog entry represents an operation in a transaction and false otherwise. + */ +bool isTransactionEntry(OplogEntry entry) { + auto sessionInfo = entry.getOperationSessionInfo(); + if (!sessionInfo.getTxnNumber()) { + return false; + } + + return entry.isInPendingTransaction() || + entry.getCommandType() == repl::OplogEntry::CommandType::kPrepareTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction || + entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps; } } // namespace -boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( +boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::_updateOrFlush( const OplogEntry& entry) { const auto& ns = entry.getNss(); @@ -131,6 +138,16 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( return boost::none; } +boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateSession( + const OplogEntry& entry) { + if (!isTransactionEntry(entry)) { + return _updateOrFlush(entry); + } + auto txnTableUpdate = _createTransactionTableUpdateFromTransactionOp(entry); + return (txnTableUpdate) ? boost::optional<std::vector<OplogEntry>>({*txnTableUpdate}) + : boost::none; +} + void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { const auto& sessionInfo = entry.getOperationSessionInfo(); @@ -204,7 +221,6 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() { invariant(newUpdate); opList.push_back(std::move(*newUpdate)); } - _sessionsToUpdate.clear(); return opList; @@ -229,5 +245,74 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate( return opList; } +boost::optional<OplogEntry> SessionUpdateTracker::_createTransactionTableUpdateFromTransactionOp( + const repl::OplogEntry& entry) { + auto sessionInfo = entry.getOperationSessionInfo(); + + // We only update the transaction table on the first inTxn operation. + if (entry.isInPendingTransaction() && !entry.getPrevWriteOpTimeInTransaction()->isNull()) { + return boost::none; + } + invariant(sessionInfo.getSessionId()); + invariant(entry.getWallClockTime()); + + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(*sessionInfo.getSessionId()); + newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + newTxnRecord.setLastWriteOpTime(entry.getOpTime()); + newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); + + // "state" is a new field in 4.2. + if (serverGlobalParams.featureCompatibility.getVersion() < + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { + return newTxnRecord.toBSON(); + } + + if (entry.isInPendingTransaction()) { + invariant(entry.getPrevWriteOpTimeInTransaction()->isNull()); + newTxnRecord.setState(DurableTxnStateEnum::kInProgress); + newTxnRecord.setStartOpTime(entry.getOpTime()); + return newTxnRecord.toBSON(); + } + switch (entry.getCommandType()) { + case repl::OplogEntry::CommandType::kApplyOps: + if (entry.shouldPrepare()) { + newTxnRecord.setState(DurableTxnStateEnum::kPrepared); + newTxnRecord.setStartOpTime(entry.getOpTime()); + } else { + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + } + break; + case repl::OplogEntry::CommandType::kPrepareTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kPrepared); + if (entry.getPrevWriteOpTimeInTransaction()->isNull()) { + // The 'prepareTransaction' entry is the first operation of the transaction. + newTxnRecord.setStartOpTime(entry.getOpTime()); + } else { + // Update the transaction record using $set to avoid overwriting the + // startOpTime. + return BSON("$set" << newTxnRecord.toBSON()); + } + break; + case repl::OplogEntry::CommandType::kCommitTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kAbortTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kAborted); + break; + default: + break; + } + return newTxnRecord.toBSON(); + }(); + + return createOplogEntryForTransactionTableUpdate( + entry.getOpTime(), + updateBSON, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), + *entry.getWallClockTime()); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h index b729a78869f..4c646644cb0 100644 --- a/src/mongo/db/repl/session_update_tracker.h +++ b/src/mongo/db/repl/session_update_tracker.h @@ -51,17 +51,18 @@ namespace repl { class SessionUpdateTracker { public: /** - * Inspects the oplog entry and determines whether this needs to update the session info or - * flush stored transaction information to oplog writes. - */ - boost::optional<std::vector<OplogEntry>> updateOrFlush(const OplogEntry& entry); - - /** * Converts all stored transaction infos to oplog writes to config.transactions. * Can return an empty vector if there is nothing to flush. */ std::vector<OplogEntry> flushAll(); + /** + * If 'entry' is part of a transaction, then return an update to the transaction table + * corresponding to the operation. Otherwise, inspect the entry to determine whether to buffer + * or flush the stored transaction information as part of retryable writes. + */ + boost::optional<std::vector<OplogEntry>> updateSession(const OplogEntry& entry); + private: /** * Analyzes the given oplog entry and determines which transactions stored so far needs to be @@ -82,6 +83,19 @@ private: */ void _updateSessionInfo(const OplogEntry& entry); + /** + * Inspects the oplog entry and determines whether this needs to update the session info or + * flush stored transaction information to oplog writes. + */ + boost::optional<std::vector<OplogEntry>> _updateOrFlush(const OplogEntry& entry); + + /** + * Returns an update to the transaction table generated by a transaction operation. This returns + * boost::none if the operation is an inTxn operation that is not the first of the transaction. + */ + boost::optional<OplogEntry> _createTransactionTableUpdateFromTransactionOp( + const repl::OplogEntry& entry); + LogicalSessionIdMap<OplogEntry> _sessionsToUpdate; }; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 68de8ea4e39..37d270817ab 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -74,6 +74,7 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/db/transaction_participant.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" @@ -1100,7 +1101,6 @@ Status multiSyncApply(OperationContext* opCtx, return Status::OK(); } - /** * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops * vector in any other way. @@ -1140,7 +1140,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // We need to track all types of ops, including type 'n' (these are generated from chunk // migrations). if (sessionUpdateTracker) { - if (auto newOplogWrites = sessionUpdateTracker->updateOrFlush(op)) { + if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { derivedOps->emplace_back(std::move(*newOplogWrites)); _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index aae7b2932e1..ec850c4a921 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -484,6 +484,21 @@ protected: gUseMultipleOplogEntryFormatForTransactions = false; SyncTailTest::tearDown(); } + + void checkTxnTable(const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + DurableTxnStateEnum expectedState) { + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + expectedOpTime, + expectedWallClock, + expectedStartOpTime, + expectedState); + } }; TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { @@ -552,13 +567,20 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); // Apply a batch with only the first operation. This should result in the first oplog entry - // being put in the oplog, but with no effect because the operation is part of a pending - // transaction. + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = insertOp1.getOpTime(); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp1})); ASSERT_EQ(1U, insertedOplogEntries.size()); ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp1.toBSON()); ASSERT_TRUE(insertedDocs1.empty()); ASSERT_TRUE(insertedDocs2.empty()); + checkTxnTable(lsid, + txnNum, + insertOp1.getOpTime(), + *insertOp1.getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Apply a batch with only the second operation. This should result in the second oplog entry // being put in the oplog, but with no effect because the operation is part of a pending @@ -568,6 +590,14 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), insertOp2.toBSON()); ASSERT_TRUE(insertedDocs1.empty()); ASSERT_TRUE(insertedDocs2.empty()); + // The transaction table should not have been updated for inTxn operations that are not the + // first in a transaction. + checkTxnTable(lsid, + txnNum, + insertOp1.getOpTime(), + *insertOp1.getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the two previous entries being applied. @@ -576,6 +606,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { ASSERT_EQ(1U, insertedDocs1.size()); ASSERT_EQ(1U, insertedDocs2.size()); ASSERT_BSONOBJ_EQ(insertedOplogEntries.back(), commitOp.toBSON()); + checkTxnTable(lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); } TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { @@ -657,6 +693,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { ASSERT_EQ(0U, insertedOplogEntries.size()); ASSERT_EQ(1U, insertedDocs1.size()); ASSERT_EQ(1U, insertedDocs2.size()); + checkTxnTable(lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); } TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { @@ -721,10 +763,17 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { // Insert the first entry in its own batch. This should result in the oplog entry being written // but the entry should not be applied as it is part of a pending transaction. + const auto expectedStartOpTime = insertOps[0].getOpTime(); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); ASSERT_EQ(1U, insertedOplogEntries.size()); ASSERT_EQ(0U, insertedDocs1.size()); ASSERT_EQ(0U, insertedDocs2.size()); + checkTxnTable(lsid, + txnNum, + insertOps[0].getOpTime(), + *insertOps[0].getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Insert the rest of the entries, including the commit. These entries should be added to the // oplog, and all the entries including the first should be applied. @@ -733,6 +782,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { ASSERT_EQ(5U, insertedOplogEntries.size()); ASSERT_EQ(3U, insertedDocs1.size()); ASSERT_EQ(1U, insertedDocs2.size()); + checkTxnTable(lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); // Check docs and ordering of docs in nss1. // The insert into nss2 is unordered with respect to those. @@ -850,6 +905,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { ASSERT_EQ(6U, insertedOplogEntries.size()); ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); ASSERT_EQ(4U, insertedDocs1.size()); + checkTxnTable(lsid, + txnNum2, + commitOp2.getOpTime(), + *commitOp2.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); // Check docs and ordering of docs in nss1. ASSERT_BSONOBJ_EQ(insertOps1[0].getObject(), insertedDocs1[0]); @@ -916,7 +977,14 @@ protected: _prepareOp->getOpTime()); // This re-parse puts the commit op into a normalized form for comparison. _commitOp = uassertStatusOK(OplogEntry::parse(_commitOp->toBSON())); - + _abortOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL}, + _nss1, + BSON("abortTransaction" << 1), + _lsid, + _txnNum, + StmtId(3), + _prepareOp->getOpTime()); + _abortOp = uassertStatusOK(OplogEntry::parse(_abortOp->toBSON())); _opObserver->onInsertsFn = [&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) { stdx::lock_guard<stdx::mutex> lock(_insertMutex); @@ -940,7 +1008,7 @@ protected: LogicalSessionId _lsid; TxnNumber _txnNum; boost::optional<OplogEntry> _insertOp1, _insertOp2; - boost::optional<OplogEntry> _prepareOp, _commitOp; + boost::optional<OplogEntry> _prepareOp, _commitOp, _abortOp; std::map<NamespaceString, std::vector<BSONObj>> _insertedDocs; std::unique_ptr<ThreadPool> _writerPool; @@ -953,14 +1021,21 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); // Apply a batch with the insert operations. This should result in the oplog entries - // being put in the oplog, but with no effect because the operation is part of a pending - // transaction. + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->toBSON(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->toBSON(), oplogDocs()[1]); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + *_insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, and the two previous entries being applied (but in a transaction). @@ -969,6 +1044,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea ASSERT_BSONOBJ_EQ(_prepareOp->toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _prepareOp->getOpTime(), + *_prepareOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the two previous entries being committed. @@ -976,6 +1057,52 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea ASSERT_BSONOBJ_EQ(_commitOp->toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitOp->getOpTime(), + *_commitOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); +} + +TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) { + SyncTail syncTail( + nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + + // Apply a batch with the insert operations. This should result in the oplog entries + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + *_insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); + + // Apply a batch with only the prepare. This should result in the prepare being put in the + // oplog, and the two previous entries being applied (but in a transaction). + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareOp})); + checkTxnTable(_lsid, + _txnNum, + _prepareOp->getOpTime(), + *_prepareOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); + + // Apply a batch with only the abort. This should result in the abort being put in the + // oplog and the transaction table being updated accordingly. + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortOp})); + ASSERT_BSONOBJ_EQ(_abortOp->toBSON(), oplogDocs().back()); + ASSERT_EQ(1U, _insertedDocs[_nss1].size()); + ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _abortOp->getOpTime(), + *_abortOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInitialSync) { @@ -987,14 +1114,21 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit SyncTailTest::makeInitialSyncOptions()); // Apply a batch with the insert operations. This should result in the oplog entries - // being put in the oplog, but with no effect because the operation is part of a pending - // transaction. + // being put in the oplog and updating the transaction table, but not actually being applied + // because they are part of a pending transaction. + const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->toBSON(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->toBSON(), oplogDocs()[1]); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + *_insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, but, since this is initial sync, nothing else. @@ -1003,6 +1137,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit ASSERT_BSONOBJ_EQ(_prepareOp->toBSON(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _prepareOp->getOpTime(), + *_prepareOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the two previous entries being applied. @@ -1010,6 +1150,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit ASSERT_BSONOBJ_EQ(_commitOp->toBSON(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitOp->getOpTime(), + *_commitOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionRecovery) { @@ -1033,16 +1179,29 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with the insert operations. This should have no effect, because this is // recovery. + const auto expectedStartOpTime = _insertOp1->getOpTime(); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _insertOp1->getOpTime(), + *_insertOp1->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kInProgress); // Apply a batch with only the prepare. This should have no effect, since this is recovery. ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareOp})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); + checkTxnTable(_lsid, + _txnNum, + _prepareOp->getOpTime(), + *_prepareOp->getWallClockTime(), + expectedStartOpTime, + DurableTxnStateEnum::kPrepared); // Apply a batch with only the commit. This should result in the the two previous entries being // applied. @@ -1050,6 +1209,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco ASSERT_TRUE(oplogDocs().empty()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); + checkTxnTable(_lsid, + _txnNum, + _commitOp->getOpTime(), + *_commitOp->getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); } void testWorkerMultikeyPaths(OperationContext* opCtx, @@ -2232,18 +2397,13 @@ public: invariant(sessionInfo.getSessionId()); invariant(sessionInfo.getTxnNumber()); - DBDirectClient client(_opCtx.get()); - auto result = client.findOne( - NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())}); - ASSERT_FALSE(result.isEmpty()); - - auto txnRecord = - SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result); - - ASSERT_EQ(*sessionInfo.getTxnNumber(), txnRecord.getTxnNum()); - ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime()); - ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate()); + repl::checkTxnTable(_opCtx.get(), + *sessionInfo.getSessionId(), + *sessionInfo.getTxnNumber(), + expectedOpTime, + expectedWallClock, + {}, + {}); } static const NamespaceString& nss() { diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 7811b128fc8..9e9b5ed6fab 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -34,6 +34,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" @@ -248,6 +249,34 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { return Status::OK(); } +void checkTxnTable(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + boost::optional<DurableTxnStateEnum> expectedState) { + DBDirectClient client(opCtx); + auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); + ASSERT_FALSE(result.isEmpty()); + + auto txnRecord = + SessionTxnRecord::parse(IDLParserErrorContext("parse txn record for test"), result); + + ASSERT_EQ(txnNum, txnRecord.getTxnNum()); + ASSERT_EQ(expectedOpTime, txnRecord.getLastWriteOpTime()); + ASSERT_EQ(expectedWallClock, txnRecord.getLastWriteDate()); + if (expectedStartOpTime) { + ASSERT(txnRecord.getStartOpTime()); + ASSERT_EQ(*expectedStartOpTime, *txnRecord.getStartOpTime()); + } else { + ASSERT(!txnRecord.getStartOpTime()); + } + if (expectedState) { + ASSERT(*expectedState == txnRecord.getState()); + } +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index d34191857be..a47c1071891 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -31,10 +31,12 @@ #include "mongo/base/status.h" #include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/op_observer_noop.h" #include "mongo/db/repl/replication_consistency_markers.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_txn_record_gen.h" namespace mongo { @@ -151,5 +153,13 @@ Status failedApplyCommand(OperationContext* opCtx, const BSONObj& theOperation, OplogApplication::Mode); +void checkTxnTable(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNum, + const repl::OpTime& expectedOpTime, + Date_t expectedWallClock, + boost::optional<repl::OpTime> expectedStartOpTime, + boost::optional<DurableTxnStateEnum> expectedState); + } // namespace repl } // namespace mongo |