diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2023-02-06 10:17:24 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-06 16:59:56 +0000 |
commit | a7b6c12cd6975e17148a7effa7d964936619f0b5 (patch) | |
tree | a73a072c7d6c5e53177944d92b0968f5ce455365 | |
parent | 39ddb8c2c82900ccd4452928d9fb0c8f22b35e3a (diff) | |
download | mongo-a7b6c12cd6975e17148a7effa7d964936619f0b5.tar.gz |
SERVER-72744: Introduce code path for a primary to commit a split prepared transaction.
-rw-r--r-- | src/mongo/db/repl/mock_repl_coord_server_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 10 | ||||
-rw-r--r-- | src/mongo/db/transaction/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.cpp | 85 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.h | 14 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant_test.cpp | 341 |
7 files changed, 433 insertions, 33 deletions
diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.h b/src/mongo/db/repl/mock_repl_coord_server_fixture.h index e0859800f63..6fa3f076851 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.h +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.h @@ -42,7 +42,7 @@ class StorageInterfaceMock; } // namespace repl /** - * This is a basic fixture that is backed by an ephemeral storage engine and a mock replication + * This is a basic fixture that is backed by a real storage engine and a mock replication * coordinator that is running as primary. */ class MockReplCoordServerFixture : public ServiceContextMongoDTest { diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index b31f3757586..b556ccd3ca3 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -244,11 +244,13 @@ void ReplicationCoordinatorMock::_setMyLastAppliedOpTimeAndWallTime( _myLastAppliedOpTime = opTimeAndWallTime.opTime; _myLastAppliedWallTime = opTimeAndWallTime.wallTime; - _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime); + if (_updateCommittedSnapshot) { + _setCurrentCommittedSnapshotOpTime(lk, opTimeAndWallTime.opTime); - if (auto storageEngine = _service->getStorageEngine()) { - if (auto snapshotManager = storageEngine->getSnapshotManager()) { - snapshotManager->setCommittedSnapshot(opTimeAndWallTime.opTime.getTimestamp()); + if (auto storageEngine = _service->getStorageEngine()) { + if (auto snapshotManager = storageEngine->getSnapshotManager()) { + snapshotManager->setCommittedSnapshot(opTimeAndWallTime.opTime.getTimestamp()); + } } } } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index a5ee67ef28f..ef43e450400 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -423,6 +423,15 @@ public: virtual SplitPrepareSessionManager* getSplitPrepareSessionManager() override; + /** + * If this is true, the mock will update the "committed snapshot" everytime the "last applied" + * is updated. That behavior can be disabled for tests that need more control over what's + * majority committed. + */ + void setUpdateCommittedSnapshot(bool val) { + _updateCommittedSnapshot = val; + } + private: void _setMyLastAppliedOpTimeAndWallTime(WithLock lk, const OpTimeAndWallTime& opTimeAndWallTime); @@ -454,6 +463,7 @@ private: bool _canAcceptNonLocalWrites = false; SplitPrepareSessionManager _splitSessionManager; + bool _updateCommittedSnapshot = true; }; } // namespace repl diff --git a/src/mongo/db/transaction/SConscript b/src/mongo/db/transaction/SConscript index ec8eadcd64a..210440097fc 100644 --- a/src/mongo/db/transaction/SConscript +++ b/src/mongo/db/transaction/SConscript @@ -100,7 +100,10 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/op_observer/op_observer', + '$BUILD_DIR/mongo/db/op_observer/op_observer_impl', + '$BUILD_DIR/mongo/db/op_observer/oplog_writer_impl', '$BUILD_DIR/mongo/db/repl/image_collection_entry', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/replica_set_aware_service', @@ -110,6 +113,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/stats/transaction_stats', + '$BUILD_DIR/mongo/db/storage/storage_control', 'transaction', 'transaction_api', 'transaction_operations', diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index e615798495f..b408894fcff 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -1913,7 +1913,6 @@ void TransactionParticipant::Participant::commitPreparedTransaction( // Once entering "committing with prepare" we cannot throw an exception. // TODO (SERVER-71610): Fix to be interruptible or document exception. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT. - opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); // On secondary, we generate a fake empty oplog slot, since it's not used by opObserver. OplogSlot commitOplogSlot; @@ -1946,13 +1945,28 @@ void TransactionParticipant::Participant::commitPreparedTransaction( // entry. invariant(!o().lastWriteOpTime.isNull()); - // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the commitOplogSlot - // which will only be set if we are primary. Otherwise, the commitOplogEntryOpTime must have - // been passed in during secondary oplog application. auto commitOplogSlotOpTime = commitOplogEntryOpTime.value_or(commitOplogSlot); - opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp()); - - _commitStorageTransaction(opCtx); + auto* splitPrepareManager = + repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); + if (opCtx->writesAreReplicated() && + splitPrepareManager->isSessionSplit( + _sessionId(), o().activeTxnNumberAndRetryCounter.getTxnNumber())) { + // If we are a primary committing a transaction that was split into smaller prepared + // transactions, use a special commit code path. + _commitSplitPreparedTxnOnPrimary(opCtx, + splitPrepareManager, + _sessionId(), + o().activeTxnNumberAndRetryCounter.getTxnNumber(), + commitTimestamp, + commitOplogSlot.getTimestamp()); + } else { + // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the + // commitOplogSlot which will only be set if we are primary. Otherwise, the + // commitOplogEntryOpTime must have been passed in during secondary oplog application. + opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); + opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp()); + _commitStorageTransaction(opCtx); + } auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); @@ -1998,13 +2012,68 @@ void TransactionParticipant::Participant::_commitStorageTransaction(OperationCon opCtx->lockState()->unsetMaxLockTimeout(); } +void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary( + OperationContext* parentOpCtx, + repl::SplitPrepareSessionManager* splitPrepareManager, + const LogicalSessionId& userSessionId, + const TxnNumber& userTxnNumber, + const Timestamp& commitTimestamp, + const Timestamp& durableTimestamp) { + + for (const repl::PooledSession& session : + splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) { + + auto splitClientOwned = parentOpCtx->getServiceContext()->makeClient("tempSplitClient"); + auto splitOpCtx = splitClientOwned->makeOperationContext(); + AlternativeClientRegion acr(splitClientOwned); + + std::unique_ptr<MongoDSessionCatalog::Session> checkedOutSession; + + repl::UnreplicatedWritesBlock notReplicated(splitOpCtx.get()); + splitOpCtx->setLogicalSessionId(session.getSessionId()); + splitOpCtx->setTxnNumber(session.getTxnNumber()); + splitOpCtx->setInMultiDocumentTransaction(); + + auto mongoDSessionCatalog = MongoDSessionCatalog::get(splitOpCtx.get()); + checkedOutSession = mongoDSessionCatalog->checkOutSession(splitOpCtx.get()); + TransactionParticipant::Participant newTxnParticipant = + TransactionParticipant::get(splitOpCtx.get()); + newTxnParticipant.beginOrContinueTransactionUnconditionally( + splitOpCtx.get(), {*(splitOpCtx->getTxnNumber())}); + newTxnParticipant.unstashTransactionResources(splitOpCtx.get(), "commitTransaction"); + + splitOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); + splitOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp); + + { + Lock::GlobalLock rstl(splitOpCtx.get(), LockMode::MODE_IX); + newTxnParticipant._commitStorageTransaction(splitOpCtx.get()); + auto operationCount = newTxnParticipant.p().transactionOperations.numOperations(); + auto oplogOperationBytes = + newTxnParticipant.p().transactionOperations.getTotalOperationBytes(); + newTxnParticipant.clearOperationsInMemory(splitOpCtx.get()); + + newTxnParticipant._finishCommitTransaction( + splitOpCtx.get(), operationCount, oplogOperationBytes); + } + + newTxnParticipant.stashTransactionResources(splitOpCtx.get()); + checkedOutSession->checkIn(splitOpCtx.get(), OperationContextSession::CheckInReason::kDone); + } + + parentOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); + parentOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp); + this->_commitStorageTransaction(parentOpCtx); +} + void TransactionParticipant::Participant::_finishCommitTransaction( OperationContext* opCtx, size_t operationCount, size_t oplogOperationBytes) noexcept { { auto tickSource = opCtx->getServiceContext()->getTickSource(); stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).txnState.transitionTo(TransactionState::kCommitted); - + // Features such as the "split prepared transaction" optimization will not attribute + // transaction metrics to the internal sessions. o(lk).transactionMetricsObserver.onCommit(opCtx, ServerTransactionsMetrics::get(opCtx), tickSource, diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h index a945034c98f..152637db865 100644 --- a/src/mongo/db/transaction/transaction_participant.h +++ b/src/mongo/db/transaction/transaction_participant.h @@ -840,6 +840,20 @@ public: // This should be called *without* the Client being locked. void _commitStorageTransaction(OperationContext* opCtx); + // Commits a "split prepared" transaction. Prepared transactions processed on secondaries + // may split the storage writes into multiple RecoveryUnits. This method will be invoked by + // a primary such that it looks for all recovery units and commits them such that they + // become visible to snapshot/timestamped readers atomically. + // + // This must be called while an OplogSlot is being held open at or earlier than the input + // `durableTimestamp`. + void _commitSplitPreparedTxnOnPrimary(OperationContext* opCtx, + repl::SplitPrepareSessionManager* splitPrepareManager, + const LogicalSessionId& sessionId, + const TxnNumber& txnNumber, + const Timestamp& commitTimestamp, + const Timestamp& durableTimestamp); + // Stash transaction resources. void _stashActiveTransaction(OperationContext* opCtx); diff --git a/src/mongo/db/transaction/transaction_participant_test.cpp b/src/mongo/db/transaction/transaction_participant_test.cpp index 6adb800b1a2..f38ed96ab36 100644 --- a/src/mongo/db/transaction/transaction_participant_test.cpp +++ b/src/mongo/db/transaction/transaction_participant_test.cpp @@ -32,25 +32,33 @@ #include "mongo/db/concurrency/replication_state_transition_lock_guard.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/global_settings.h" +#include "mongo/db/op_observer/op_observer_impl.h" #include "mongo/db/op_observer/op_observer_noop.h" #include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/op_observer/oplog_writer_impl.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/session/session_catalog.h" #include "mongo/db/session/session_catalog_mongod.h" +#include "mongo/db/session/session_txn_record_gen.h" #include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/storage/durable_history_pin.h" #include "mongo/db/transaction/server_transactions_metrics.h" #include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction/transaction_participant_gen.h" #include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" #include "mongo/stdx/future.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" @@ -61,6 +69,8 @@ #include "mongo/util/net/socket_utils.h" #include "mongo/util/tick_source_mock.h" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + namespace mongo { namespace { @@ -261,6 +271,10 @@ protected: TxnParticipantTest(Options options = {}) : MockReplCoordServerFixture(std::move(options)) {} void setUp() override { + repl::ReplSettings replSettings; + replSettings.setReplSetString("realReplicaSet"); + setGlobalReplSettings(replSettings); + MockReplCoordServerFixture::setUp(); const auto service = opCtx()->getServiceContext(); @@ -348,34 +362,43 @@ protected: return opCtxSession; } - void checkOutSessionFromDiferentOpCtx(const LogicalSessionId& lsid, - bool beginOrContinueTxn, - boost::optional<TxnNumber> txnNumber = boost::none, - boost::optional<bool> autocommit = boost::none, - boost::optional<bool> startTransaction = boost::none, - bool commitTxn = false) { + void callUnderSplitSession(const InternalSessionPool::Session& session, + std::function<void(OperationContext* opCtx)> fn) { runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { - opCtx->setLogicalSessionId(lsid); - if (txnNumber) { - opCtx->setTxnNumber(*txnNumber); - opCtx->setInMultiDocumentTransaction(); - } + // Prepared writes as part of a split session must be done with an + // `UnreplicatedWritesBlock`. This is how we mimic oplog application. + repl::UnreplicatedWritesBlock notReplicated(opCtx); + opCtx->setLogicalSessionId(session.getSessionId()); + opCtx->setTxnNumber(session.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); - auto opCtxSession = mongoDSessionCatalog->checkOutSession(opCtx); + std::unique_ptr<MongoDSessionCatalog::Session> session = + mongoDSessionCatalog->checkOutSession(opCtx); + auto newTxnParticipant = TransactionParticipant::get(opCtx); + newTxnParticipant.beginOrContinueTransactionUnconditionally(opCtx, + {*(opCtx->getTxnNumber())}); + newTxnParticipant.unstashTransactionResources(opCtx, "crud ops"); - if (beginOrContinueTxn) { - auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, {*opCtx->getTxnNumber()}, autocommit, startTransaction); + fn(opCtx); - if (commitTxn) { - txnParticipant.commitUnpreparedTransaction(opCtx); - } - } + newTxnParticipant.stashTransactionResources(opCtx); + session->checkIn(opCtx, OperationContextSession::CheckInReason::kDone); }); } + void assertSessionState(BSONObj obj, DurableTxnStateEnum expectedState) { + SessionTxnRecord txnRecord = + SessionTxnRecord::parse(IDLParserContext("test sessn txn parser"), obj); + ASSERT_EQ(expectedState, txnRecord.getState().get()); + } + + void assertNotInSessionState(BSONObj obj, DurableTxnStateEnum expectedState) { + SessionTxnRecord txnRecord = + SessionTxnRecord::parse(IDLParserContext("test sessn txn parser"), obj); + ASSERT_NE(expectedState, txnRecord.getState().get()); + } + const LogicalSessionId _sessionId{makeLogicalSessionIdForTest()}; const TxnNumber _txnNumber{20}; const UUID _uuid = UUID::gen(); @@ -5886,6 +5909,284 @@ TEST_F(TxnParticipantTest, } } +/** + * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. + */ +class OneOffRead { +public: + OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + _opCtx->recoveryUnit()->abandonSnapshot(); + if (ts.isNull()) { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } else { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); + } + } + + ~OneOffRead() { + _opCtx->recoveryUnit()->abandonSnapshot(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } + +private: + OperationContext* _opCtx; +}; + +TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { + // This test simulates: + // 1) Preparing a transaction with multiple logical sessions as a secondary. + // 2) Committing the transaction as a primary. + // 3) Rolling back the commit. + // + // This test asserts that: + // A) The writes done by both sessions are committed with the same "visible" timestamp. + // B) The writes done by both sessions rollback, due to having a "durable" timestamp that's + // ahead of the stable timestamp. + // C) The `config.transactions` table is not aware that the split sessions were part of a + // prepared transaction. One example of why this invariant is important is because + // reconstructing prepared transactions finds all `config.transactions` entries in the + // prepared state. We must ensure any prepared transaction is only reconstructed once. + // + // First we set up infrastructure such that we can simulate oplog application, primary behaviors + // and `rollbackToStable`. + OperationContext* opCtx = this->opCtx(); + DurableHistoryRegistry::set(opCtx->getServiceContext(), + std::make_unique<DurableHistoryRegistry>()); + opCtx->getServiceContext()->setOpObserver( + std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>())); + + OpDebug* const nullOpDbg = nullptr; + + dynamic_cast<repl::ReplicationCoordinatorMock*>(repl::ReplicationCoordinator::get(opCtx)) + ->setUpdateCommittedSnapshot(false); + // Initiate the term from 0 to 1 for familiarity. + ASSERT_OK(repl::ReplicationCoordinator::get(opCtx)->updateTerm(opCtx, 1)); + + // Bump the logical clock for easier visual cues. + const Timestamp startTs(100, 1); + auto oplogInfo = LocalOplogInfo::get(opCtx); + oplogInfo->setNewTimestamp(opCtx->getServiceContext(), startTs); + opCtx->getServiceContext()->getStorageEngine()->setInitialDataTimestamp(startTs); + + // Assign the variables that represent the "real", client-facing logical session. + std::unique_ptr<MongoDSessionCatalog::Session> userSession = checkOutSession(); + TransactionParticipant::Participant userTxnParticipant = TransactionParticipant::get(opCtx); + + // TxnResources start in the "stashed" state. + userTxnParticipant.unstashTransactionResources(opCtx, "crud ops"); + + // Hold the collection lock/datastructure such that it can be released prior to rollback. + boost::optional<AutoGetCollection> userColl; + userColl.emplace(opCtx, kNss, LockMode::MODE_IX); + + // We split our user session into 2 split sessions. + const int numSplits = 2; + auto* splitPrepareManager = + repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); + const std::vector<InternalSessionPool::Session>& splitSessions = + splitPrepareManager->splitSession( + opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), numSplits); + // Insert an `_id: 1` document. + callUnderSplitSession(splitSessions[0], [nullOpDbg](OperationContext* opCtx) { + AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); + ASSERT_OK( + collection_internal::insertDocument(opCtx, + userColl.getCollection(), + InsertStatement(BSON("_id" << 1 << "value" << 1)), + nullOpDbg)); + }); + + // Insert an `_id: 2` document. + callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) { + AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); + ASSERT_OK( + collection_internal::insertDocument(opCtx, + userColl.getCollection(), + InsertStatement(BSON("_id" << 2 << "value" << 1)), + nullOpDbg)); + }); + + // Update `2` to increment its `value` to 2. This must be done in the same split session as the + // insert. + callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) { + AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); + Helpers::update( + opCtx, userColl->ns(), BSON("_id" << 2), BSON("$inc" << BSON("value" << 1))); + }); + + + // Mimic the methods to call for a secondary performing a split prepare. Those are called inside + // `UnreplicatedWritesBlock` and explicitly pass in the prepare OpTime. + const Timestamp prepTs = startTs; + const repl::OpTime prepOpTime(prepTs, 1); + callUnderSplitSession(splitSessions[0], [prepOpTime](OperationContext* opCtx) { + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.prepareTransaction(opCtx, prepOpTime); + }); + callUnderSplitSession(splitSessions[1], [prepOpTime](OperationContext* opCtx) { + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.prepareTransaction(opCtx, prepOpTime); + }); + + // Normally this would also be called on a secondary under a `UnreplicatedWritesBlock`. However + // we must change the `config.transactions` state for this logical session. In production, that + // transaction table write would come via a synthetic oplog entry. + userTxnParticipant.prepareTransaction(opCtx, prepOpTime); + + // Prepared transactions are dictated a commit/visible from the transaction coordinator. However + // the commit function will assign a "durable" timestamp derived from the system clock. This is + // the code path of a primary. + const Timestamp visibleTs = prepTs + 100; + + // Committing a transaction as a primary does not allow direct control on the oplog/durable + // timestamp being allocated. We want to observe that we correctly set a durable timestamp when + // a primary commits a split prepare. Ultimately we'll observe that by seeing the transaction + // rollback on a call to `rollbackToStable`. We can coerce the durable timestamp by bumping the + // logical clock. Worth noting, the test is reading from the system clock which is expected to + // choose a value for the transaction's durable timestamp that is much much larger than + // `chosenStableTimestamp`. We only bump the clock here in the very unexpected case that the + // system clock is within few hundred seconds of the epoch. + const Timestamp chosenStableTimestamp = visibleTs + 10; + oplogInfo->setNewTimestamp(opCtx->getServiceContext(), chosenStableTimestamp + 1); + userTxnParticipant.commitPreparedTransaction(opCtx, visibleTs, boost::none); + ASSERT_LT(chosenStableTimestamp, userTxnParticipant.getLastWriteOpTime().getTimestamp()); + + // Using the `findOne` helpers by default invariants if no result is found. + const bool invariantOnError = true; + // Print out reads at the interesting times prior to asserting for diagnostics. + for (const auto& ts : std::vector<Timestamp>{prepTs, visibleTs}) { + OneOffRead oor(opCtx, ts); + LOGV2(7274401, + "Pre-rollback values", + "readTimestamp"_attr = ts, + "Doc(_id: 1)"_attr = Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError), + "Doc(_id: 2)"_attr = Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError)); + } + + // Reading at the prepare timestamp should not see anything. + { + OneOffRead oor(opCtx, prepTs); + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflicts); + ASSERT_BSONOBJ_EQ( + BSONObj::kEmptyObject, + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError)); + ASSERT_BSONOBJ_EQ( + BSONObj::kEmptyObject, + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError)); + } + + // Reading at the visible/commit timestamp should see the inserted and updated documents. + { + OneOffRead oor(opCtx, visibleTs); + ASSERT_BSONOBJ_EQ( + BSON("_id" << 1 << "value" << 1), + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError)); + ASSERT_BSONOBJ_EQ( + BSON("_id" << 2 << "value" << 2), + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError)); + } + + { + // We also assert that the user transaction record is in the committed state. + AutoGetCollection configTransactions( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IS); + // The user config.transactions document must exist and must be in the "committed" state. + BSONObj userTxnObj = + Helpers::findOneForTesting(opCtx, + configTransactions.getCollection(), + BSON("_id.id" << opCtx->getLogicalSessionId()->getId()), + !invariantOnError); + ASSERT(!userTxnObj.isEmpty()); + assertSessionState(userTxnObj, DurableTxnStateEnum::kCommitted); + + // Rather than testing the implementation, we'll assert on the weakest necessary state. A + // split `config.transactions` document may or may not exist. If it exists, it must be + // in the "committed" state. + for (auto idx = 0; idx < numSplits; ++idx) { + BSONObj splitTxnObj = Helpers::findOneForTesting( + opCtx, + configTransactions.getCollection(), + BSON("_id.id" << splitSessions[idx].getSessionId().getId()), + !invariantOnError); + if (!splitTxnObj.isEmpty()) { + assertSessionState(splitTxnObj, DurableTxnStateEnum::kCommitted); + } + } + } + + // Unlock the collection and check in the session to release locks. + userColl = boost::none; + userSession.reset(); + + // Rollback such that the commit oplog entry and the effects of the transaction are rolled + // back. + opCtx->getServiceContext()->getStorageEngine()->setStableTimestamp(chosenStableTimestamp); + { + Lock::GlobalLock globalLock(opCtx, LockMode::MODE_X); + ASSERT_OK(opCtx->getServiceContext()->getStorageEngine()->recoverToStableTimestamp(opCtx)); + } + + // Again, display read values for diagnostics. + userColl.emplace(opCtx, kNss, LockMode::MODE_IX); + for (const auto& ts : std::vector<Timestamp>{prepTs, visibleTs}) { + OneOffRead oor(opCtx, ts); + LOGV2(7274402, + "Post-rollback values", + "readTimestamp"_attr = ts, + "Doc(_id: 1)"_attr = Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError), + "Doc(_id: 2)"_attr = Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError)); + } + + // Now when we read at the commit/visible timestamp, the documents must not exist. + { + OneOffRead oor(opCtx, visibleTs); + ASSERT_BSONOBJ_EQ( + BSONObj::kEmptyObject, + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 1), !invariantOnError)); + ASSERT_BSONOBJ_EQ( + BSONObj::kEmptyObject, + Helpers::findOneForTesting( + opCtx, userColl->getCollection(), BSON("_id" << 2), !invariantOnError)); + } + + // We also assert that the user transaction record is in the prepared state. The split sessions + // are not. + AutoGetCollection configTransactions( + opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IS); + // The user `config.transactions` document must be in the "prepared" state. + BSONObj userTxnObj = + Helpers::findOneForTesting(opCtx, + configTransactions.getCollection(), + BSON("_id.id" << opCtx->getLogicalSessionId()->getId()), + !invariantOnError); + ASSERT(!userTxnObj.isEmpty()); + assertSessionState(userTxnObj, DurableTxnStateEnum::kPrepared); + + // Rather than testing the implementation, we'll assert on the weakest necessary state. A split + // `config.transactions` document may or may not exist. If it exists, it must not* be in the + // "prepared" state. + for (auto idx = 0; idx < numSplits; ++idx) { + BSONObj splitTxnObj = + Helpers::findOneForTesting(opCtx, + configTransactions.getCollection(), + BSON("_id.id" << splitSessions[idx].getSessionId().getId()), + !invariantOnError); + if (!splitTxnObj.isEmpty()) { + assertNotInSessionState(splitTxnObj, DurableTxnStateEnum::kPrepared); + } + } +} + bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) { bool existsInCatalog{false}; sessionCatalog->scanSession(lsid, |